/ Processing / src / ohlcv.jl
ohlcv.jl
  1  using .TimeTicks: Period, now, timeframe, apply
  2  using Data.DataFramesMeta
  3  using Data: Candle, to_ohlcv, empty_ohlcv, DFUtils, ZArray, _load_ohlcv, _save_ohlcv, zi
  4  using Base: _cleanup_locked
  5  using .DFUtils: appendmax!, lastdate
  6  
  7  @doc """Fills missing candles in a DataFrame.
  8  
  9  $(TYPEDSIGNATURES)
 10  
 11  This function takes a DataFrame `df`, a string `timeframe`, and optionally a filling strategy `strategy`. It identifies the missing candles in `df` based on the `timeframe`, and fills them using the specified `strategy`.
 12  
 13  filling strategies:
 14  
 15  - `:close`: fill with the close price of the previous candle.
 16  - `:open`: fill with the open price of the next candle.
 17  - `:linear`: linearly interpolate between the close price of the previous candle and the open price of the next candle.
 18  
 19  """
 20  function fill_missing_candles(df, timeframe::AbstractString; strategy=:close)
 21      @as_td
 22      _fill_missing_candles(df, prd; strategy, inplace=false)
 23  end
 24  
 25  @doc """$(TYPEDSIGNATURES) See [`fill_missing_candles`](@ref)."""
 26  function fill_missing_candles!(df, prd::Period; strategy=:close)
 27      _fill_missing_candles(df, prd; strategy, inplace=true)
 28  end
 29  
 30  @doc """$(TYPEDSIGNATURES) See [`fill_missing_candles`](@ref)."""
 31  function _fill_missing_candles!(df, timeframe::AbstractString; strategy=:close)
 32      @as_td
 33      _fill_missing_candles(df, prd; strategy, inplace=true)
 34  end
 35  
 36  _update_timestamps(left, prd, ts, from_idx) = begin
 37      for i in from_idx:lastindex(ts)
 38          left += prd
 39          ts[i] = left
 40      end
 41  end
 42  
 43  _check_cap(::Val{:uncapped}, args...) = nothing
 44  function _check_cap(::Val{:capped}, df, cap)
 45      size(df, 1) > cap && popfirst!(df)
 46  end
 47  _append_cap!(::Val{:uncapped}, _, args...) = append!(args...)
 48  _append_cap!(::Val{:capped}, cap, args...) = appendmax!(args..., cap)
 49  
 50  @doc """Applies trailing operation on a DataFrame based on a time frame.
 51  
 52  $(TYPEDSIGNATURES)
 53  
 54  This function takes a DataFrame `df`, a TimeFrame `tf`, and optionally a timestamp `to`, a timestamp `from`, and a cap `cap`. It applies a trailing window operation on `df` for the specified `tf`. The operation starts from the timestamp specified by `from` (default is the last timestamp in the DataFrame) and ends at the timestamp specified by `to`. The `cap` argument determines the maximum number of rows to keep in the dataframe.
 55  
 56  """
 57  function trail!(df, tf::TimeFrame; to, from=df[end, :timestamp], cap=0)
 58      prd = period(tf)
 59      n_to_append = (to - from) รท prd - 1
 60      if n_to_append > 0
 61          capval = cap > 0 ? Val(:capped) : Val(:uncapped)
 62          push!(df, @view(df[end, :]))
 63          _check_cap(capval, df, cap)
 64          from += prd
 65          close = df[end, :close]
 66          df[end, :timestamp] = from
 67          df[end, :open] = close
 68          df[end, :high] = close
 69          df[end, :low] = close
 70          df[end, :volume] = 0
 71          n_to_append -= 1
 72          if n_to_append > 0
 73              to_append = repeat(@view(df[end:end, :]), n_to_append)
 74              _append_cap!(capval, cap, df, to_append)
 75              from_idx = lastindex(df.timestamp) - n_to_append + 1
 76              _update_timestamps(from, prd, df.timestamp, from_idx)
 77          end
 78      end
 79  end
 80  novol_candle(ts, n) = Candle(ts, n, n, n, n, 0)
 81  nan_candle(ts, _) = Candle(ts, NaN, NaN, NaN, NaN, NaN)
 82  
 83  _isunixepoch(ts) = ts.instant.periods.value == TimeTicks.Dates.UNIXEPOCH
 84  trimzeros!(df) = begin
 85      idx = 1
 86      for t in df.timestamp
 87          _isunixepoch(t) || break
 88          idx += 1
 89      end
 90      idx > 1 && deleteat!(df, 1:(idx - 1))
 91  end
 92  
 93  function _fill_missing_candles(
 94      df::DataFrame, prd::Period; strategy, inplace, def_strategy=nan_candle, def_type=Candle
 95  )
 96      trimzeros!(df)
 97      size(df, 1) == 0 && return empty_ohlcv()
 98      ordered_rows = def_type[]
 99      # fill the row by previous close or with NaNs
100      build_candle = ifelse(strategy == :close, novol_candle, def_strategy)
101      @with df begin
102          ts_cur, ts_end = first(:timestamp) + prd, last(:timestamp)
103          ts_idx = 2
104          # NOTE: we assume that ALL timestamps are multiples of the timedelta!
105          while ts_cur < ts_end
106              if ts_cur != :timestamp[ts_idx]
107                  close = :close[ts_idx - 1]
108                  push!(ordered_rows, build_candle(ts_cur, close))
109              else
110                  ts_idx += 1
111              end
112              ts_cur += prd
113          end
114      end
115      inplace || (df = deepcopy(df))
116      try
117          append!(df, ordered_rows)
118      catch
119          # In case the dataframe is backed by a matrix we have to copy
120          df = DataFrame(df; copycols=true)
121          append!(df, ordered_rows)
122      end
123      sort!(df, :timestamp)
124      trimzeros!(df)
125      return df
126  end
127  
128  @doc """Removes incomplete candles from a DataFrame.
129  
130  $(TYPEDSIGNATURES)
131  
132  This function takes a DataFrame `in_df` and a TimeFrame `tf`. It identifies any incomplete candles in `in_df` based on `tf` and removes them.
133  
134  See [`isincomplete`](@ref) for more information.
135  """
136  function _remove_incomplete_candle(in_df, tf)
137      df = in_df isa SubDataFrame ? copy(in_df) : in_df
138      if isincomplete(df[end, :timestamp], tf)
139          lastcandle = copy(df[end, :])
140          deleteat!(df, lastindex(df, 1))
141          @debug "Dropping last candle ($(lastcandle.timestamp |> string)) because it is incomplete."
142      end
143      df
144  end
145  @doc """Cleans up OHLCV data in a DataFrame.
146  
147  $(TYPEDSIGNATURES)
148  
149  This function takes a DataFrame `data`, a TimeFrame `tf`, and optionally a column index `col` and a filling strategy `fill_missing`. It cleans up the OHLCV data in `data` by removing any incomplete candles based on `tf`, filling any missing candles using the specified filling strategy, and sorting the data by the specified column.
150  
151  """
152  function cleanup_ohlcv_data(data, tf::TimeFrame; col=1, fill_missing=:close)
153      @debug "Cleaning dataframe of size: $(size(data, 1))."
154      size(data, 1) == 0 && return empty_ohlcv()
155      df = data isa AbstractDataFrame ? data : to_ohlcv(data, tf)
156  
157      # remove incomplete candle before timestamp normalization
158      df = _remove_incomplete_candle(df, tf)
159      # normalize dates
160      @eachrow! df begin
161          :timestamp = apply(tf, :timestamp)
162      end
163  
164      gd = groupby(df, :timestamp; sort=true)
165      df = combine(
166          gd,
167          :open => first,
168          :high => maximum,
169          :low => minimum,
170          :close => last,
171          :volume => maximum;
172          renamecols=false,
173      )
174      # check again after de-duplication
175      df = _remove_incomplete_candle(df, tf)
176  
177      if fill_missing != false
178          fill_missing_candles!(df, tf.period; strategy=fill_missing)
179      else
180          trimzeros!(df)
181      end
182      df
183  end
184  @doc """$(TYPEDSIGNATURES) See [`cleanup_ohlcv_data`](@ref)."""
185  function cleanup_ohlcv_data(data, tf::AbstractString; kwargs...)
186      cleanup_ohlcv_data(data, convert(TimeFrame, tf); kwargs...)
187  end
188  
189  @doc """Cleans up OHLCV data in a ZArray.
190  
191  $(TYPEDSIGNATURES)
192  
193  This function takes a ZArray `z` and a string `timeframe`. It cleans up the OHLCV data in `z` by removing any incomplete candles based on `timeframe` and filling any missing candles using the specified filling strategy.
194  
195  """
196  function cleanup_ohlcv!(z::ZArray, timeframe::AbstractString)
197      tf = convert(TimeFrame, timeframe)
198      ohlcv = _load_ohlcv(z, timeframe)
199      ohlcv = cleanup_ohlcv_data(ohlcv, tf)
200      _save_ohlcv(z::ZArray, timefloat(tf), ohlcv)
201  end
202  
203  isincomplete(d::DateTime, tf::TimeFrame, ::Val{:raw}) = d + tf > now()
204  @doc """Checks if a DateTime is incomplete based on a TimeFrame.
205  
206  $(TYPEDSIGNATURES)
207  """
208  isincomplete(d::DateTime, tf::TimeFrame) = isincomplete(apply(tf, d), tf, Val(:raw))
209  @doc "Checks if a candle is too new.
210  
211  $(TYPEDSIGNATURES)
212  "
213  isincomplete(candle::Candle, tf::TimeFrame) = isincomplete(candle.timestamp, tf)
214  @doc "Checks if a candle is old enough to be complete.
215  
216  $(TYPEDSIGNATURES)
217  "
218  iscomplete(v, tf) = !isincomplete(v, tf)
219  @doc "Checks if a candle is exactly the latest candle.
220  
221  $(TYPEDSIGNATURES)
222  "
223  islast(d::DateTime, tf, ::Val{:raw}) = begin
224      n = now()
225      next = d + tf
226      next <= n && next + tf > n
227  end
228  islast(d::DateTime, tf::TimeFrame) = islast(apply(tf, d), tf, Val(:raw))
229  islast(candle::Candle, tf) = islast(candle.timestamp, tf, Val(:raw))
230  islast(v, tf::AbstractString) = islast(v, timeframe(tf))
231  islast(v::AbstractString, tf) = islast(something(tryparse(DateTime, v), DateTime(0)), tf)
232  islast(v::S, tf::S) where {S<:AbstractString} = islast(v, timeframe(tf))
233  _equalapply(date1, tf, date2) = apply(tf, date1) - period(tf) == date2
234  @doc """Checks if the last row of a smaller DataFrame is also the last row of a larger DataFrame.
235  
236  $(TYPEDSIGNATURES)
237  
238  """
239  function islast(larger::DataFrame, smaller::DataFrame)
240      let tf = timeframe!(larger), date = lastdate(smaller)
241          _equalapply(date, tf, lastdate(larger))
242      end
243  end
244  @doc "`a` is left adjacent to `b` if in order `..ab..`
245  
246  $(TYPEDSIGNATURES)
247  "
248  isleftadj(a, b, tf::TimeFrame) = a + tf == b
249  @doc "`a` is right adjacent to `b` if in order `..ba..`
250  
251  $(TYPEDSIGNATURES)
252  "
253  isrightadj(a, b, tf::TimeFrame) = isleftadj(b, a, tf)
254  @doc "`a` is adjacent to `b` if either [`isleftadj`](@ref) or [`isrightadj`](@ref)."
255  isadjacent(a, b, tf::TimeFrame) = isleftadj(a, b, tf) || isrightadj(a, b, tf)
256  
257  export cleanup_ohlcv_data,
258      isincomplete, iscomplete, islast, isleftadj, isrightadj, isadjacent