ohlcv.jl
1 using .TimeTicks: Period, now, timeframe, apply 2 using Data.DataFramesMeta 3 using Data: Candle, to_ohlcv, empty_ohlcv, DFUtils, ZArray, _load_ohlcv, _save_ohlcv, zi 4 using Base: _cleanup_locked 5 using .DFUtils: appendmax!, lastdate 6 7 @doc """Fills missing candles in a DataFrame. 8 9 $(TYPEDSIGNATURES) 10 11 This function takes a DataFrame `df`, a string `timeframe`, and optionally a filling strategy `strategy`. It identifies the missing candles in `df` based on the `timeframe`, and fills them using the specified `strategy`. 12 13 filling strategies: 14 15 - `:close`: fill with the close price of the previous candle. 16 - `:open`: fill with the open price of the next candle. 17 - `:linear`: linearly interpolate between the close price of the previous candle and the open price of the next candle. 18 19 """ 20 function fill_missing_candles(df, timeframe::AbstractString; strategy=:close) 21 @as_td 22 _fill_missing_candles(df, prd; strategy, inplace=false) 23 end 24 25 @doc """$(TYPEDSIGNATURES) See [`fill_missing_candles`](@ref).""" 26 function fill_missing_candles!(df, prd::Period; strategy=:close) 27 _fill_missing_candles(df, prd; strategy, inplace=true) 28 end 29 30 @doc """$(TYPEDSIGNATURES) See [`fill_missing_candles`](@ref).""" 31 function _fill_missing_candles!(df, timeframe::AbstractString; strategy=:close) 32 @as_td 33 _fill_missing_candles(df, prd; strategy, inplace=true) 34 end 35 36 _update_timestamps(left, prd, ts, from_idx) = begin 37 for i in from_idx:lastindex(ts) 38 left += prd 39 ts[i] = left 40 end 41 end 42 43 _check_cap(::Val{:uncapped}, args...) = nothing 44 function _check_cap(::Val{:capped}, df, cap) 45 size(df, 1) > cap && popfirst!(df) 46 end 47 _append_cap!(::Val{:uncapped}, _, args...) = append!(args...) 48 _append_cap!(::Val{:capped}, cap, args...) = appendmax!(args..., cap) 49 50 @doc """Applies trailing operation on a DataFrame based on a time frame. 51 52 $(TYPEDSIGNATURES) 53 54 This function takes a DataFrame `df`, a TimeFrame `tf`, and optionally a timestamp `to`, a timestamp `from`, and a cap `cap`. It applies a trailing window operation on `df` for the specified `tf`. The operation starts from the timestamp specified by `from` (default is the last timestamp in the DataFrame) and ends at the timestamp specified by `to`. The `cap` argument determines the maximum number of rows to keep in the dataframe. 55 56 """ 57 function trail!(df, tf::TimeFrame; to, from=df[end, :timestamp], cap=0) 58 prd = period(tf) 59 n_to_append = (to - from) รท prd - 1 60 if n_to_append > 0 61 capval = cap > 0 ? Val(:capped) : Val(:uncapped) 62 push!(df, @view(df[end, :])) 63 _check_cap(capval, df, cap) 64 from += prd 65 close = df[end, :close] 66 df[end, :timestamp] = from 67 df[end, :open] = close 68 df[end, :high] = close 69 df[end, :low] = close 70 df[end, :volume] = 0 71 n_to_append -= 1 72 if n_to_append > 0 73 to_append = repeat(@view(df[end:end, :]), n_to_append) 74 _append_cap!(capval, cap, df, to_append) 75 from_idx = lastindex(df.timestamp) - n_to_append + 1 76 _update_timestamps(from, prd, df.timestamp, from_idx) 77 end 78 end 79 end 80 novol_candle(ts, n) = Candle(ts, n, n, n, n, 0) 81 nan_candle(ts, _) = Candle(ts, NaN, NaN, NaN, NaN, NaN) 82 83 _isunixepoch(ts) = ts.instant.periods.value == TimeTicks.Dates.UNIXEPOCH 84 trimzeros!(df) = begin 85 idx = 1 86 for t in df.timestamp 87 _isunixepoch(t) || break 88 idx += 1 89 end 90 idx > 1 && deleteat!(df, 1:(idx - 1)) 91 end 92 93 function _fill_missing_candles( 94 df::DataFrame, prd::Period; strategy, inplace, def_strategy=nan_candle, def_type=Candle 95 ) 96 trimzeros!(df) 97 size(df, 1) == 0 && return empty_ohlcv() 98 ordered_rows = def_type[] 99 # fill the row by previous close or with NaNs 100 build_candle = ifelse(strategy == :close, novol_candle, def_strategy) 101 @with df begin 102 ts_cur, ts_end = first(:timestamp) + prd, last(:timestamp) 103 ts_idx = 2 104 # NOTE: we assume that ALL timestamps are multiples of the timedelta! 105 while ts_cur < ts_end 106 if ts_cur != :timestamp[ts_idx] 107 close = :close[ts_idx - 1] 108 push!(ordered_rows, build_candle(ts_cur, close)) 109 else 110 ts_idx += 1 111 end 112 ts_cur += prd 113 end 114 end 115 inplace || (df = deepcopy(df)) 116 try 117 append!(df, ordered_rows) 118 catch 119 # In case the dataframe is backed by a matrix we have to copy 120 df = DataFrame(df; copycols=true) 121 append!(df, ordered_rows) 122 end 123 sort!(df, :timestamp) 124 trimzeros!(df) 125 return df 126 end 127 128 @doc """Removes incomplete candles from a DataFrame. 129 130 $(TYPEDSIGNATURES) 131 132 This function takes a DataFrame `in_df` and a TimeFrame `tf`. It identifies any incomplete candles in `in_df` based on `tf` and removes them. 133 134 See [`isincomplete`](@ref) for more information. 135 """ 136 function _remove_incomplete_candle(in_df, tf) 137 df = in_df isa SubDataFrame ? copy(in_df) : in_df 138 if isincomplete(df[end, :timestamp], tf) 139 lastcandle = copy(df[end, :]) 140 deleteat!(df, lastindex(df, 1)) 141 @debug "Dropping last candle ($(lastcandle.timestamp |> string)) because it is incomplete." 142 end 143 df 144 end 145 @doc """Cleans up OHLCV data in a DataFrame. 146 147 $(TYPEDSIGNATURES) 148 149 This function takes a DataFrame `data`, a TimeFrame `tf`, and optionally a column index `col` and a filling strategy `fill_missing`. It cleans up the OHLCV data in `data` by removing any incomplete candles based on `tf`, filling any missing candles using the specified filling strategy, and sorting the data by the specified column. 150 151 """ 152 function cleanup_ohlcv_data(data, tf::TimeFrame; col=1, fill_missing=:close) 153 @debug "Cleaning dataframe of size: $(size(data, 1))." 154 size(data, 1) == 0 && return empty_ohlcv() 155 df = data isa AbstractDataFrame ? data : to_ohlcv(data, tf) 156 157 # remove incomplete candle before timestamp normalization 158 df = _remove_incomplete_candle(df, tf) 159 # normalize dates 160 @eachrow! df begin 161 :timestamp = apply(tf, :timestamp) 162 end 163 164 gd = groupby(df, :timestamp; sort=true) 165 df = combine( 166 gd, 167 :open => first, 168 :high => maximum, 169 :low => minimum, 170 :close => last, 171 :volume => maximum; 172 renamecols=false, 173 ) 174 # check again after de-duplication 175 df = _remove_incomplete_candle(df, tf) 176 177 if fill_missing != false 178 fill_missing_candles!(df, tf.period; strategy=fill_missing) 179 else 180 trimzeros!(df) 181 end 182 df 183 end 184 @doc """$(TYPEDSIGNATURES) See [`cleanup_ohlcv_data`](@ref).""" 185 function cleanup_ohlcv_data(data, tf::AbstractString; kwargs...) 186 cleanup_ohlcv_data(data, convert(TimeFrame, tf); kwargs...) 187 end 188 189 @doc """Cleans up OHLCV data in a ZArray. 190 191 $(TYPEDSIGNATURES) 192 193 This function takes a ZArray `z` and a string `timeframe`. It cleans up the OHLCV data in `z` by removing any incomplete candles based on `timeframe` and filling any missing candles using the specified filling strategy. 194 195 """ 196 function cleanup_ohlcv!(z::ZArray, timeframe::AbstractString) 197 tf = convert(TimeFrame, timeframe) 198 ohlcv = _load_ohlcv(z, timeframe) 199 ohlcv = cleanup_ohlcv_data(ohlcv, tf) 200 _save_ohlcv(z::ZArray, timefloat(tf), ohlcv) 201 end 202 203 isincomplete(d::DateTime, tf::TimeFrame, ::Val{:raw}) = d + tf > now() 204 @doc """Checks if a DateTime is incomplete based on a TimeFrame. 205 206 $(TYPEDSIGNATURES) 207 """ 208 isincomplete(d::DateTime, tf::TimeFrame) = isincomplete(apply(tf, d), tf, Val(:raw)) 209 @doc "Checks if a candle is too new. 210 211 $(TYPEDSIGNATURES) 212 " 213 isincomplete(candle::Candle, tf::TimeFrame) = isincomplete(candle.timestamp, tf) 214 @doc "Checks if a candle is old enough to be complete. 215 216 $(TYPEDSIGNATURES) 217 " 218 iscomplete(v, tf) = !isincomplete(v, tf) 219 @doc "Checks if a candle is exactly the latest candle. 220 221 $(TYPEDSIGNATURES) 222 " 223 islast(d::DateTime, tf, ::Val{:raw}) = begin 224 n = now() 225 next = d + tf 226 next <= n && next + tf > n 227 end 228 islast(d::DateTime, tf::TimeFrame) = islast(apply(tf, d), tf, Val(:raw)) 229 islast(candle::Candle, tf) = islast(candle.timestamp, tf, Val(:raw)) 230 islast(v, tf::AbstractString) = islast(v, timeframe(tf)) 231 islast(v::AbstractString, tf) = islast(something(tryparse(DateTime, v), DateTime(0)), tf) 232 islast(v::S, tf::S) where {S<:AbstractString} = islast(v, timeframe(tf)) 233 _equalapply(date1, tf, date2) = apply(tf, date1) - period(tf) == date2 234 @doc """Checks if the last row of a smaller DataFrame is also the last row of a larger DataFrame. 235 236 $(TYPEDSIGNATURES) 237 238 """ 239 function islast(larger::DataFrame, smaller::DataFrame) 240 let tf = timeframe!(larger), date = lastdate(smaller) 241 _equalapply(date, tf, lastdate(larger)) 242 end 243 end 244 @doc "`a` is left adjacent to `b` if in order `..ab..` 245 246 $(TYPEDSIGNATURES) 247 " 248 isleftadj(a, b, tf::TimeFrame) = a + tf == b 249 @doc "`a` is right adjacent to `b` if in order `..ba..` 250 251 $(TYPEDSIGNATURES) 252 " 253 isrightadj(a, b, tf::TimeFrame) = isleftadj(b, a, tf) 254 @doc "`a` is adjacent to `b` if either [`isleftadj`](@ref) or [`isrightadj`](@ref)." 255 isadjacent(a, b, tf::TimeFrame) = isleftadj(a, b, tf) || isrightadj(a, b, tf) 256 257 export cleanup_ohlcv_data, 258 isincomplete, iscomplete, islast, isleftadj, isrightadj, isadjacent