/ Processing / src / resample.jl
resample.jl
  1  using .TimeTicks
  2  using .TimeTicks: td_tf
  3  using .Lang: passkwargs, @deassert
  4  using Base: beginsym
  5  using Data: zi, save_ohlcv, PairData, empty_ohlcv
  6  using Data.DFUtils
  7  using Data.DataFrames
  8  using Pbar
  9  
 10  @doc """Returns the left and right indices for a given frame.
 11  
 12  $(TYPEDSIGNATURES)
 13  
 14  This function takes a data vector, frame size, source time delta, and target time delta, and computes the left and right indices for the frame based on these parameters.
 15  
 16  """
 17  function _left_and_right(data, frame_size, src_td, td)
 18      left = 1
 19      while (timefloat(data.timestamp[left])) % td != 0.0
 20          left += 1
 21      end
 22      right = size(data, 1)
 23      let last_sample_candle_remainder = src_td * (frame_size - 1)
 24          while (timefloat(data.timestamp[right])) % td != last_sample_candle_remainder
 25              right -= 1
 26          end
 27      end
 28      left, right
 29  end
 30  
 31  @doc """Computes the deltas for a given transformation.
 32  
 33  $(TYPEDSIGNATURES)
 34  
 35  This function takes a data vector and a target transformation function, and computes the deltas (changes) in the data that would result from applying the transformation.
 36  
 37  """
 38  function _deltas(data, to_tf)
 39      # NOTE: need at least 2 points
 40      result(f=NaN, s=NaN, t=NaN; abort=nothing) = (f, s, t, abort)
 41      sz = size(data, 1)
 42      sz > 1 || return result(; abort=empty_ohlcv())
 43  
 44      td = timefloat(to_tf)
 45      src_prd = timeframe(data).period
 46      src_td = timefloat(src_prd)
 47  
 48      @assert td >= src_td "Upsampling not supported. (from $((td_tf[src_td])) to $(td_tf[td]))"
 49      td === src_td && return result(; abort=data)
 50      frame_size::Integer = td ÷ src_td
 51      sz >= frame_size || return result(; abort=empty_ohlcv())
 52      result(frame_size, src_td, td)
 53  end
 54  
 55  @doc """Resamples a style based on a transformation function.
 56  
 57  $(TYPEDSIGNATURES)
 58  
 59  This function takes a style and a transformation function `tf`, and resamples the style based on the transformation.
 60  
 61  """
 62  function resample_style(style, tf)
 63      if style == :ohlcv
 64          (
 65              :timestamp => x -> apply(tf, first(x)),
 66              :open => first,
 67              :high => maximum,
 68              :low => minimum,
 69              :close => last,
 70              :volume => sum,
 71          )
 72      else
 73          style
 74      end
 75  end
 76  
 77  @doc """Resamples data based on transformation functions.
 78  
 79  $(TYPEDSIGNATURES)
 80  
 81  This function takes a data vector, a source transformation function `from_tf`, a target transformation function `to_tf`, and optionally a boolean `cleanup` and a style `style`. It resamples the data from the source time frame to the target time frame. If `cleanup` is true, it removes any invalid data points after resampling. The resampling style is determined by `style`. If `chop` is true, it removes the leftovers at the end of the data that can't fill a complete frame.
 82  """
 83  function resample(data, from_tf, to_tf, cleanup=false, style=:ohlcv, chop=true)
 84      @deassert all(cleanup_ohlcv_data(data, from_tf).timestamp .== data.timestamp) "Resampling assumptions are not met, expecting cleaned data."
 85  
 86      if cleanup
 87          data = cleanup_ohlcv_data(data, from_tf)
 88      end
 89  
 90      frame_size, src_td, td, abort = _deltas(data, to_tf)
 91      isnothing(abort) || return abort
 92      left, right = if chop
 93          _left_and_right(data, frame_size, src_td, td)
 94      else
 95          1, nrow(data)
 96      end
 97  
 98      # Create a new dataframe to keep thread safety
 99      data = DataFrame(@view(data[left:right, :]); copycols=false)
100      size(data, 1) == 0 && return empty_ohlcv()
101  
102      data[!, :sample] = timefloat.(data.timestamp) .÷ td
103      gb = groupby(data, :sample)
104      df = combine(gb, resample_style(style, to_tf)...; renamecols=false)
105      select!(data, Not(:sample))
106      select!(df, Not(:sample))
107      timeframe!(df, to_tf)
108      @debug "last 2 candles: " df[end - 1, :timestamp] df[end, :timestamp]
109      df
110  end
111  @doc """Resamples data, and saves to storage.
112  
113  $(TYPEDSIGNATURES)
114  
115  !!! warning "Usually not worth it"
116      Resampling is quite fast, so it is simpler to keep only the smaller timeframe
117      on storage, and resample the longer ones on demand.
118  
119  """
120  function resample(args...; exc_name, name, dosave=false)
121      df = resample(args...)
122      if size(df)[1] > 0 && dosave
123          save_ohlcv(zi, exc_name, name, string(last(args)), df)
124      end
125      df
126  end
127  
128  @doc "$(TYPEDSIGNATURES). See [`resample`](@ref)."
129  function resample(pair::PairData, to_tf)
130      from_tf = convert(TimeFrame, pair.tf)
131      to_tf = convert(TimeFrame, to_tf)
132      resample(pair.data, from_tf, to_tf)
133  end
134  
135  @doc "$(TYPEDSIGNATURES). See [`resample`](@ref)."
136  function resample(mkts::AbstractDict{String,PairData}, timeframe; progress=false, lk = ReentrantLock())
137      rs = Dict{String,PairData}()
138      progress && @pbar! mkts "Instruments"
139      try
140          Threads.@threads for (name, pair_data) in collect(mkts)
141              v = PairData(name, timeframe, resample(pair_data, timeframe), nothing)
142              @lock lk rs[name] = v
143              progress && @pbupdate!
144          end
145      finally
146          progress && @pbclose!
147      end
148      rs
149  end
150  
151  @doc "$(TYPEDSIGNATURES). See [`resample`](@ref)."
152  function resample(df::AbstractDataFrame, tf::TimeFrame, b::Bool=false, args...; kwargs...)
153      resample(df, timeframe!(df), tf, b, args...; kwargs...)
154  end
155  
156  @doc "$(TYPEDSIGNATURES). See [`resample`](@ref)."
157  macro resample(params, mkts, timeframe, args...)
158      e = esc(:Exchanges)
159      kwargs = passkwargs(args...)
160      m = esc(mkts)
161      quote
162          resample($(e).exc, $m, $timeframe; $(kwargs...))
163      end
164  end
165  
166  export resample, @resample
167  
168  """
169  Upsample OHLCV DataFrame from a larger timeframe to a smaller one.
170  
171  Arguments:
172  - df: DataFrame with OHLCV columns and a regular, contiguous timeframe (large_tf)
173  - large_tf: TimeFrame of the input DataFrame
174  - small_tf: TimeFrame to upsample to (must be a divisor of large_tf)
175  
176  Returns:
177  - DataFrame with OHLCV columns at the smaller timeframe, where each large candle is expanded into N small candles (N = large_tf.period ÷ small_tf.period),
178    with open/high/low/close flat-filled, and volume divided equally.
179  """
180  function upsample(df::AbstractDataFrame, large_tf::TimeFrame, small_tf::TimeFrame)
181      @assert timefloat(large_tf.period) > timefloat(small_tf.period) "upsample: large_tf must be greater than small_tf"
182      @assert timefloat(large_tf.period) % timefloat(small_tf.period) == 0 "upsample: large_tf ($(large_tf.period)) must be a multiple of small_tf ($(small_tf.period)))"
183      n = Int(timefloat(large_tf.period) ÷ timefloat(small_tf.period))
184      n_large = nrow(df)
185      n_small = n_large * n
186  
187      ts = Vector{DateTime}(undef, n_small)
188      open = Vector{Float64}(undef, n_small)
189      high = Vector{Float64}(undef, n_small)
190      low = Vector{Float64}(undef, n_small)
191      close = Vector{Float64}(undef, n_small)
192      volume = Vector{Float64}(undef, n_small)
193  
194      # Assign columns to local variables for performance
195      df_timestamp = df.timestamp
196      df_open = df.open
197      df_high = df.high
198      df_low = df.low
199      df_close = df.close
200      df_volume = df.volume
201  
202      for i in 1:n_large
203          t0 = df_timestamp[i] - large_tf.period + small_tf.period
204          v_per = df_volume[i] / n
205          o = df_open[i]
206          h = df_high[i]
207          l = df_low[i]
208          c = df_close[i]
209          for j in 0:(n-1)
210              idx = (i-1)*n + j + 1
211              ts[idx] = t0 + j * small_tf.period
212              open[idx] = o
213              high[idx] = h
214              low[idx] = l
215              close[idx] = c
216              volume[idx] = v_per
217          end
218      end
219      out = DataFrame(timestamp=ts, open=open, high=high, low=low, close=close, volume=volume)
220      timeframe!(out, small_tf)
221      out
222  end
223  
224  export upsample