propagate.jl
1 using Data.DataStructures: SortedDict 2 using Data: contiguous_ts 3 using Data.DFUtils: addcols! 4 using .Lang: @deassert 5 using .Misc.DocStringExtensions 6 using .Misc: rangeafter 7 import Data: propagate_ohlcv! 8 9 @doc """Updates OHLCV data across multiple time frames. 10 11 $(TYPEDSIGNATURES) 12 13 This function takes a dictionary `data` and an aggregation function `update_func`. It updates the OHLCV data from the base time frame to the higher time frames in `data`, using `update_func` to aggregate the OHLCV values from the base to the target time frame. 14 The function modifies `data` in place and returns it. 15 If the base time frame data frame in `data` is empty, the function clears all the higher time frames data frames. 16 Otherwise, it asynchronously updates each higher time frame data frame and ensures that the timestamps are synchronized across all time frames. 17 18 """ 19 function propagate_ohlcv!( 20 data::SortedDict{TimeFrame,DataFrame}, update_func::Function=propagate_ohlcv! 21 ) 22 base_tf, base_data = first(data) 23 if isempty(base_data) 24 foreach(empty!, Iterators.drop(values(data), 1)) 25 return data 26 else 27 props_itr = Iterators.drop(data, 1) 28 props_n = length(props_itr) 29 for (dst_tf, dst_data) in props_itr 30 let src_data = base_data, src_tf = base_tf, tf_idx = 1 31 function dowarn() 32 @debug "propagate ohlcv: failed" base_tf src_tf dst_tf 33 end 34 while true 35 if tf_idx > props_n 36 break 37 end 38 # use a lower res frame if the upper res frame has not enough candles 39 if nrow(src_data) < count(src_tf, dst_tf) 40 src_tf, src_data = first(Iterators.drop(data, tf_idx)) 41 # Can't propagate if the source tf exceedes the target tf 42 if src_tf >= dst_tf 43 dowarn() 44 break 45 end 46 tf_idx += 1 47 continue 48 end 49 update_func(src_tf, src_data, dst_tf, dst_data) 50 # stop if dst data matches the padded date of source data 51 if !isempty(dst_data) && islast(dst_data, src_data) 52 break 53 end 54 src_tf, src_data = first(Iterators.drop(data, tf_idx)) 55 # Can't propagate if the source tf exceedes the target tf 56 if src_tf >= dst_tf 57 dowarn() 58 break 59 end 60 tf_idx += 1 61 end 62 @deassert contiguous_ts(dst_data.timestamp, string(timeframe!(dst_data))) 63 end 64 end 65 end 66 end 67 68 @doc """Resamples OHLCV data between different time frames. 69 70 $(TYPEDSIGNATURES) 71 72 This function resamples the OHLCV data from a source DataFrame to a destination DataFrame with different timeframes. If the latest timestamp in the destination DataFrame is earlier than the earliest timestamp in the resampled source DataFrame, the function appends the resampled data to the destination DataFrame. If not, the function returns the destination DataFrame as is. 73 Both the source and destination DataFrames must have columns named 'timestamp', 'open', 'high', 'low', 'close', and 'volume'. 74 The source and destination timeframes must be suitable for the resampling operation. 75 """ 76 function propagate_ohlcv!( 77 src::DataFrame, 78 dst::DataFrame; 79 src_tf=timeframe!(src), 80 dst_tf=timeframe!(dst), 81 strict=true, 82 ) 83 if isempty(dst) 84 new = resample(src, src_tf, dst_tf) 85 addcols!(new, dst) 86 append!(dst, new) 87 else 88 date_dst = lastdate(dst) 89 min_rows = count(src_tf, dst_tf) 90 if strict && nrow(src) < min_rows 91 @warn "Source dataframe ($(src_tf)) doesn't have enough rows for resampling $(nrow(src)) < $min_rows" 92 return dst 93 end 94 src_slice = @view src[rangeafter(src.timestamp, date_dst), :] 95 # Same check as before but over the slice 96 if strict && nrow(src_slice) < min_rows 97 return dst 98 end 99 new = resample(src_slice, src_tf, dst_tf) 100 isempty(new) && return dst 101 if isleftadj(date_dst, firstdate(new), dst_tf) 102 addcols!(new, dst) 103 append!(dst, new) 104 else 105 dst 106 end 107 end 108 end 109 110 function propagate_ohlcv!(base_tf, base_data, tf, tf_data) 111 propagate_ohlcv!(base_data, tf_data; src_tf=base_tf, dst_tf=tf) 112 end 113 114 export propagate_ohlcv!