resample.jl
1 using .TimeTicks 2 using .TimeTicks: td_tf 3 using .Lang: passkwargs, @deassert 4 using Base: beginsym 5 using Data: zi, save_ohlcv, PairData, empty_ohlcv 6 using Data.DFUtils 7 using Data.DataFrames 8 using Pbar 9 10 @doc """Returns the left and right indices for a given frame. 11 12 $(TYPEDSIGNATURES) 13 14 This function takes a data vector, frame size, source time delta, and target time delta, and computes the left and right indices for the frame based on these parameters. 15 16 """ 17 function _left_and_right(data, frame_size, src_td, td) 18 left = 1 19 while (timefloat(data.timestamp[left])) % td != 0.0 20 left += 1 21 end 22 right = size(data, 1) 23 let last_sample_candle_remainder = src_td * (frame_size - 1) 24 while (timefloat(data.timestamp[right])) % td != last_sample_candle_remainder 25 right -= 1 26 end 27 end 28 left, right 29 end 30 31 @doc """Computes the deltas for a given transformation. 32 33 $(TYPEDSIGNATURES) 34 35 This function takes a data vector and a target transformation function, and computes the deltas (changes) in the data that would result from applying the transformation. 36 37 """ 38 function _deltas(data, to_tf) 39 # NOTE: need at least 2 points 40 result(f=NaN, s=NaN, t=NaN; abort=nothing) = (f, s, t, abort) 41 sz = size(data, 1) 42 sz > 1 || return result(; abort=empty_ohlcv()) 43 44 td = timefloat(to_tf) 45 src_prd = timeframe(data).period 46 src_td = timefloat(src_prd) 47 48 @assert td >= src_td "Upsampling not supported. (from $((td_tf[src_td])) to $(td_tf[td]))" 49 td === src_td && return result(; abort=data) 50 frame_size::Integer = td ÷ src_td 51 sz >= frame_size || return result(; abort=empty_ohlcv()) 52 result(frame_size, src_td, td) 53 end 54 55 @doc """Resamples a style based on a transformation function. 56 57 $(TYPEDSIGNATURES) 58 59 This function takes a style and a transformation function `tf`, and resamples the style based on the transformation. 60 61 """ 62 function resample_style(style, tf) 63 if style == :ohlcv 64 ( 65 :timestamp => x -> apply(tf, first(x)), 66 :open => first, 67 :high => maximum, 68 :low => minimum, 69 :close => last, 70 :volume => sum, 71 ) 72 else 73 style 74 end 75 end 76 77 @doc """Resamples data based on transformation functions. 78 79 $(TYPEDSIGNATURES) 80 81 This function takes a data vector, a source transformation function `from_tf`, a target transformation function `to_tf`, and optionally a boolean `cleanup` and a style `style`. It resamples the data from the source time frame to the target time frame. If `cleanup` is true, it removes any invalid data points after resampling. The resampling style is determined by `style`. If `chop` is true, it removes the leftovers at the end of the data that can't fill a complete frame. 82 """ 83 function resample(data, from_tf, to_tf, cleanup=false, style=:ohlcv, chop=true) 84 @deassert all(cleanup_ohlcv_data(data, from_tf).timestamp .== data.timestamp) "Resampling assumptions are not met, expecting cleaned data." 85 86 if cleanup 87 data = cleanup_ohlcv_data(data, from_tf) 88 end 89 90 frame_size, src_td, td, abort = _deltas(data, to_tf) 91 isnothing(abort) || return abort 92 left, right = if chop 93 _left_and_right(data, frame_size, src_td, td) 94 else 95 1, nrow(data) 96 end 97 98 # Create a new dataframe to keep thread safety 99 data = DataFrame(@view(data[left:right, :]); copycols=false) 100 size(data, 1) == 0 && return empty_ohlcv() 101 102 data[!, :sample] = timefloat.(data.timestamp) .÷ td 103 gb = groupby(data, :sample) 104 df = combine(gb, resample_style(style, to_tf)...; renamecols=false) 105 select!(data, Not(:sample)) 106 select!(df, Not(:sample)) 107 timeframe!(df, to_tf) 108 @debug "last 2 candles: " df[end - 1, :timestamp] df[end, :timestamp] 109 df 110 end 111 @doc """Resamples data, and saves to storage. 112 113 $(TYPEDSIGNATURES) 114 115 !!! warning "Usually not worth it" 116 Resampling is quite fast, so it is simpler to keep only the smaller timeframe 117 on storage, and resample the longer ones on demand. 118 119 """ 120 function resample(args...; exc_name, name, dosave=false) 121 df = resample(args...) 122 if size(df)[1] > 0 && dosave 123 save_ohlcv(zi, exc_name, name, string(last(args)), df) 124 end 125 df 126 end 127 128 @doc "$(TYPEDSIGNATURES). See [`resample`](@ref)." 129 function resample(pair::PairData, to_tf) 130 from_tf = convert(TimeFrame, pair.tf) 131 to_tf = convert(TimeFrame, to_tf) 132 resample(pair.data, from_tf, to_tf) 133 end 134 135 @doc "$(TYPEDSIGNATURES). See [`resample`](@ref)." 136 function resample(mkts::AbstractDict{String,PairData}, timeframe; progress=false, lk = ReentrantLock()) 137 rs = Dict{String,PairData}() 138 progress && @pbar! mkts "Instruments" 139 try 140 Threads.@threads for (name, pair_data) in collect(mkts) 141 v = PairData(name, timeframe, resample(pair_data, timeframe), nothing) 142 @lock lk rs[name] = v 143 progress && @pbupdate! 144 end 145 finally 146 progress && @pbclose! 147 end 148 rs 149 end 150 151 @doc "$(TYPEDSIGNATURES). See [`resample`](@ref)." 152 function resample(df::AbstractDataFrame, tf::TimeFrame, b::Bool=false, args...; kwargs...) 153 resample(df, timeframe!(df), tf, b, args...; kwargs...) 154 end 155 156 @doc "$(TYPEDSIGNATURES). See [`resample`](@ref)." 157 macro resample(params, mkts, timeframe, args...) 158 e = esc(:Exchanges) 159 kwargs = passkwargs(args...) 160 m = esc(mkts) 161 quote 162 resample($(e).exc, $m, $timeframe; $(kwargs...)) 163 end 164 end 165 166 export resample, @resample 167 168 """ 169 Upsample OHLCV DataFrame from a larger timeframe to a smaller one. 170 171 Arguments: 172 - df: DataFrame with OHLCV columns and a regular, contiguous timeframe (large_tf) 173 - large_tf: TimeFrame of the input DataFrame 174 - small_tf: TimeFrame to upsample to (must be a divisor of large_tf) 175 176 Returns: 177 - DataFrame with OHLCV columns at the smaller timeframe, where each large candle is expanded into N small candles (N = large_tf.period ÷ small_tf.period), 178 with open/high/low/close flat-filled, and volume divided equally. 179 """ 180 function upsample(df::AbstractDataFrame, large_tf::TimeFrame, small_tf::TimeFrame) 181 @assert timefloat(large_tf.period) > timefloat(small_tf.period) "upsample: large_tf must be greater than small_tf" 182 @assert timefloat(large_tf.period) % timefloat(small_tf.period) == 0 "upsample: large_tf ($(large_tf.period)) must be a multiple of small_tf ($(small_tf.period)))" 183 n = Int(timefloat(large_tf.period) ÷ timefloat(small_tf.period)) 184 n_large = nrow(df) 185 n_small = n_large * n 186 187 ts = Vector{DateTime}(undef, n_small) 188 open = Vector{Float64}(undef, n_small) 189 high = Vector{Float64}(undef, n_small) 190 low = Vector{Float64}(undef, n_small) 191 close = Vector{Float64}(undef, n_small) 192 volume = Vector{Float64}(undef, n_small) 193 194 # Assign columns to local variables for performance 195 df_timestamp = df.timestamp 196 df_open = df.open 197 df_high = df.high 198 df_low = df.low 199 df_close = df.close 200 df_volume = df.volume 201 202 for i in 1:n_large 203 t0 = df_timestamp[i] - large_tf.period + small_tf.period 204 v_per = df_volume[i] / n 205 o = df_open[i] 206 h = df_high[i] 207 l = df_low[i] 208 c = df_close[i] 209 for j in 0:(n-1) 210 idx = (i-1)*n + j + 1 211 ts[idx] = t0 + j * small_tf.period 212 open[idx] = o 213 high[idx] = h 214 low[idx] = l 215 close[idx] = c 216 volume[idx] = v_per 217 end 218 end 219 out = DataFrame(timestamp=ts, open=open, high=high, low=low, close=close, volume=volume) 220 timeframe!(out, small_tf) 221 out 222 end 223 224 export upsample