/ Processing / src / propagate.jl
propagate.jl
  1  using Data.DataStructures: SortedDict
  2  using Data: contiguous_ts
  3  using Data.DFUtils: addcols!
  4  using .Lang: @deassert
  5  using .Misc.DocStringExtensions
  6  using .Misc: rangeafter
  7  import Data: propagate_ohlcv!
  8  
  9  @doc """Updates OHLCV data across multiple time frames.
 10  
 11  $(TYPEDSIGNATURES)
 12  
 13  This function takes a dictionary `data` and an aggregation function `update_func`. It updates the OHLCV data from the base time frame to the higher time frames in `data`, using `update_func` to aggregate the OHLCV values from the base to the target time frame.
 14  The function modifies `data` in place and returns it.
 15  If the base time frame data frame in `data` is empty, the function clears all the higher time frames data frames.
 16  Otherwise, it asynchronously updates each higher time frame data frame and ensures that the timestamps are synchronized across all time frames.
 17  
 18  """
 19  function propagate_ohlcv!(
 20      data::SortedDict{TimeFrame,DataFrame}, update_func::Function=propagate_ohlcv!
 21  )
 22      base_tf, base_data = first(data)
 23      if isempty(base_data)
 24          foreach(empty!, Iterators.drop(values(data), 1))
 25          return data
 26      else
 27          props_itr = Iterators.drop(data, 1)
 28          props_n = length(props_itr)
 29          for (dst_tf, dst_data) in props_itr
 30              let src_data = base_data, src_tf = base_tf, tf_idx = 1
 31                  function dowarn()
 32                      @debug "propagate ohlcv: failed" base_tf src_tf dst_tf
 33                  end
 34                  while true
 35                      if tf_idx > props_n
 36                          break
 37                      end
 38                      # use a lower res frame if the upper res frame has not enough candles
 39                      if nrow(src_data) < count(src_tf, dst_tf)
 40                          src_tf, src_data = first(Iterators.drop(data, tf_idx))
 41                          # Can't propagate if the source tf exceedes the target tf
 42                          if src_tf >= dst_tf
 43                              dowarn()
 44                              break
 45                          end
 46                          tf_idx += 1
 47                          continue
 48                      end
 49                      update_func(src_tf, src_data, dst_tf, dst_data)
 50                      # stop if dst data matches the padded date of source data
 51                      if !isempty(dst_data) && islast(dst_data, src_data)
 52                          break
 53                      end
 54                      src_tf, src_data = first(Iterators.drop(data, tf_idx))
 55                      # Can't propagate if the source tf exceedes the target tf
 56                      if src_tf >= dst_tf
 57                          dowarn()
 58                          break
 59                      end
 60                      tf_idx += 1
 61                  end
 62                  @deassert contiguous_ts(dst_data.timestamp, string(timeframe!(dst_data)))
 63              end
 64          end
 65      end
 66  end
 67  
 68  @doc """Resamples OHLCV data between different time frames.
 69  
 70  $(TYPEDSIGNATURES)
 71  
 72  This function resamples the OHLCV data from a source DataFrame to a destination DataFrame with different timeframes. If the latest timestamp in the destination DataFrame is earlier than the earliest timestamp in the resampled source DataFrame, the function appends the resampled data to the destination DataFrame. If not, the function returns the destination DataFrame as is.
 73  Both the source and destination DataFrames must have columns named 'timestamp', 'open', 'high', 'low', 'close', and 'volume'.
 74  The source and destination timeframes must be suitable for the resampling operation.
 75  """
 76  function propagate_ohlcv!(
 77      src::DataFrame,
 78      dst::DataFrame;
 79      src_tf=timeframe!(src),
 80      dst_tf=timeframe!(dst),
 81      strict=true,
 82  )
 83      if isempty(dst)
 84          new = resample(src, src_tf, dst_tf)
 85          addcols!(new, dst)
 86          append!(dst, new)
 87      else
 88          date_dst = lastdate(dst)
 89          min_rows = count(src_tf, dst_tf)
 90          if strict && nrow(src) < min_rows
 91              @warn "Source dataframe ($(src_tf)) doesn't have enough rows for resampling $(nrow(src)) < $min_rows"
 92              return dst
 93          end
 94          src_slice = @view src[rangeafter(src.timestamp, date_dst), :]
 95          # Same check as before but over the slice
 96          if strict && nrow(src_slice) < min_rows
 97              return dst
 98          end
 99          new = resample(src_slice, src_tf, dst_tf)
100          isempty(new) && return dst
101          if isleftadj(date_dst, firstdate(new), dst_tf)
102              addcols!(new, dst)
103              append!(dst, new)
104          else
105              dst
106          end
107      end
108  end
109  
110  function propagate_ohlcv!(base_tf, base_data, tf, tf_data)
111      propagate_ohlcv!(base_data, tf_data; src_tf=base_tf, dst_tf=tf)
112  end
113  
114  export propagate_ohlcv!