ohlcv.jl
1 using Watchers.WatchersImpls: 2 ccxt_ohlcv_watcher, ccxt_ohlcv_tickers_watcher, ccxt_ohlcv_candles_watcher, ccxt_average_ohlcv_watcher 3 import Watchers.WatchersImpls: cached_ohlcv! 4 using .st: logpath 5 using .Data: DataFrame, propagate_ohlcv! 6 7 @doc """ Continuously propagates OHLCV data. 8 9 $(TYPEDSIGNATURES) 10 11 This function continuously propagates OHLCV (Open, High, Low, Close, Volume) data for a given watcher `w` and ai `ai`. 12 It enters an infinite loop where it safely waits for a process in the watcher, then checks if the watcher is stopped. 13 If the watcher is not stopped, it tries to propagate the OHLCV data. 14 15 """ 16 propagate_loop(s::RTStrategy, ai, w::Watcher) = begin 17 data = ai.data 18 met = ohlcvmethod(s) 19 try 20 while true 21 safewait(w.beacon.process) 22 try 23 propagate_ohlcv!(data) 24 cached_ohlcv!(ai, met) 25 catch exception 26 @debug "watchers: propagate loop" exception 27 end 28 end 29 catch 30 @warn "watchers: propagate loop stopped" ai = raw(ai) 31 end 32 end 33 34 @doc """ Continuously propagates OHLCV data for each asset in the universe. 35 36 $(TYPEDSIGNATURES) 37 38 This function continuously propagates OHLCV (Open, High, Low, Close, Volume) data for each asset in a strategy's universe. 39 It enters an infinite loop where it safely waits for a process in the watcher, then checks if the watcher is stopped. 40 If the watcher is not stopped, it tries to propagate the OHLCV data for each asset in the strategy's universe. 41 42 """ 43 propagate_loop(s::RTStrategy, w::Watcher) = begin 44 try 45 while true 46 safewait(w.beacon.process) 47 for ai in s.universe 48 try 49 propagate_ohlcv!(ai.data) 50 catch exception 51 @debug "watchers: propagate loop" exception 52 end 53 end 54 end 55 catch 56 @warn "watchers: propagate loop stopped" s = nameof(s) 57 end 58 end 59 60 @doc """ Returns the OHLCV method for the strategy. 61 62 $(TYPEDSIGNATURES) 63 64 Fetches the `live_ohlcv_method` attribute from the strategy `s`. 65 Returns the method used for generating OHLCV data in the strategy. 66 values: `tickers`, `trades`, `candles` 67 68 """ 69 ohlcvmethod(s::Strategy) = attr(s, :live_ohlcv_method, :candles) 70 ohlcvmethod(ai::AssetInstance) = attr(ai, :live_ohlcv_method, :candles) 71 function ohlcvmethod!(s::Strategy, m=nothing) 72 if m ∉ (:tickers, :candles, :trades, :average, nothing) 73 error("ohlcv methods supported are `tickers`, `candles`, `trades` and `average`") 74 end 75 k = :live_ohlcv_method 76 setfunc = isnothing(m) ? (d, _) -> attr!(d, k, :candles) : (d, m) -> setattr!(d, m, k) 77 setfunc(s, m) 78 for ai in universe(s) 79 setfunc(ai, m) 80 end 81 @something m :tickers 82 end 83 84 @doc """ Returns the watchers for OHLCV data. 85 86 $(TYPEDSIGNATURES) 87 88 Determines if OHLCV data should be generated by trades. 89 If so, it returns the dictionary of watchers for each asset instance. 90 Otherwise, it returns the single watcher for the strategy `s`. 91 92 """ 93 function ohlcv_watchers(s::RTStrategy) 94 if ohlcvmethod!(s) == :trades 95 @lget! s.attrs :live_ohlcv_watchers Dict{AssetInstance,Watcher}() 96 else 97 attr(s, :live_ohlcv_watcher, nothing) 98 end 99 end 100 101 @doc """ Watches and propagates OHLCV data. 102 103 $(TYPEDSIGNATURES) 104 105 This function starts watchers for OHLCV (Open, High, Low, Close, Volume) data based on the strategy's universe and whether OHLCV data should be generated by trades. 106 For each asset in the universe, it starts a watcher that propagates OHLCV data. 107 If OHLCV data should not be generated by trades, it starts a single watcher for all assets in the universe. 108 109 """ 110 function watch_ohlcv!(s::RTStrategy; exc=exchange(s), kwargs...) 111 ow = ohlcv_watchers(s) 112 met = ohlcvmethod(s) 113 if met == :trades 114 function start_watcher(ai) 115 # NOTE: define it as local, otherwise async would clobber it before 116 # being saved in the dict 117 local w 118 sym = raw(ai) 119 eid = exchangeid(exc) 120 default_view = @lget! ai.data s.timeframe cached_ohlcv!( 121 eid, met, s.timeframe, sym 122 ) 123 prev_w = get(ow, ai, missing) 124 if !ismissing(prev_w) 125 if isrunning(prev_w) 126 close(prev_w) 127 end 128 end 129 w = ccxt_ohlcv_watcher(exc, sym; s.timeframe, default_view) 130 Watchers.load!(w) 131 w[:propagate_task] = @async propagate_loop(s, ai, w) 132 start!(w) 133 ow[ai] = w 134 end 135 @sync for ai in s.universe 136 @async start_watcher(ai) 137 end 138 else 139 eid = exchangeid(s) 140 default_view = Dict{String,DataFrame}( 141 raw(ai) => 142 @lget!(ai.data, s.timeframe, cached_ohlcv!(eid, met, s.timeframe, raw(ai))) 143 for ai in s.universe 144 ) 145 buffer_capacity = attr(s, :live_buffer_capacity, 100) 146 view_capacity = attr( 147 s, :live_view_capacity, count(s.timeframe, tf"1d") + 1 + buffer_capacity 148 ) 149 n_jobs = attr(s, :live_ohlcv_jobs, 8) 150 function propagate_callback(_, sym) 151 @debug "watchers: propagating" _module = LogWatchOHLCV sym 152 ai = asset_bysym(s, sym) 153 ohlcv_dict(ai) |> propagate_ohlcv! 154 cached_ohlcv!(ai, met) 155 end 156 watcher_func = if met == :tickers 157 (exc; kwargs...) -> ccxt_ohlcv_tickers_watcher(exc; kwargs...) 158 elseif met == :candles 159 (exc; syms, kwargs...) -> ccxt_ohlcv_candles_watcher(exc, syms; kwargs...) 160 elseif met == :average 161 exchanges = attr(s, :watcher_exchanges, nothing) 162 if exchanges isa Vector{Symbol} || exchanges isa Vector{String} 163 exchanges = getexchange!.(Symbol.(exchanges); sandbox=issandbox(exc)) 164 elseif !(exchanges isa Vector{Exchange}) 165 @warn "watcher_exchanges is not a Vector{Exchange}, Symbol, or String; defaulting to strategy exchange" exchanges 166 exchanges = Exchange[exc] 167 end 168 input_source = attr(s, :watcher_ohlcvmethod, nothing) 169 valid_sources = (:tickers, :candles, :trades) 170 if input_source ∉ valid_sources 171 @warn "Invalid watcher_ohlcvmethod for average ohlcv method, defaulting to :tickers" input_source 172 input_source = :tickers 173 end 174 (exc; syms, kwargs...) -> ccxt_average_ohlcv_watcher( 175 exchanges, syms; 176 timeframe=timeframe, 177 input_source=input_source, 178 kwargs... 179 ) 180 else 181 error("call: invalid ohlcv method $met") 182 end 183 load_path = attr(s, :watcher_load_path, dirname(s.path)) 184 s[:live_ohlcv_watcher] = 185 w = watcher_func( 186 exc; 187 timeframe=s.timeframe, 188 syms=(raw(ai) for ai in s.universe), 189 flush=false, 190 logfile=logpath(s; name="ohlcv_watcher_$(nameof(s))"), 191 load_path=load_path, 192 buffer_capacity, 193 view_capacity, 194 n_jobs, 195 default_view, 196 callback=propagate_callback, 197 ) 198 w[:quiet] = true 199 w[:resync_noncontig] = true 200 w[:startup_task] = @async begin 201 wv = w.view 202 @sync for ai in s.universe 203 sym = raw(ai) 204 wv[sym] = ai.data[s.timeframe] 205 @async Watchers.load!(w, sym) 206 end 207 end 208 start!(w) 209 end 210 end 211 212 @doc """ Stops watching OHLCV data. 213 214 $(TYPEDSIGNATURES) 215 216 This function stops the watchers that were started to propagate OHLCV (Open, High, Low, Close, Volume) data based on the strategy's universe. 217 If the watcher is a single instance, it stops the watcher. 218 If the watcher is a dictionary of watchers, it stops each watcher in the dictionary. 219 220 """ 221 function stop_watch_ohlcv!(s::RTStrategy) 222 w = ohlcv_watchers(s) 223 isnothing(w) && return nothing 224 if w isa Watcher 225 if isrunning(w) 226 stop!(w) 227 end 228 elseif valtype(w) <: Watcher 229 for ai_w in values(w) 230 if isrunning(ai_w) 231 stop!(ai_w) 232 end 233 end 234 else 235 error() 236 end 237 end 238 239 function empty_ohlcv(s::Strategy, ai::AssetInstance) 240 cached_ohlcv!(exchangeid(s), ohlcvmethod(s), period(s.timeframe), raw(ai)) 241 end 242 243 function empty_ohlcv(ai::AssetInstance, tf::TimeFrame; met=:candles) 244 cached_ohlcv!(exchangeid(ai), ohlcvmethod(ai), period(tf), raw(ai)) 245 end 246 247 @doc "Ensures dataframes in the strategy are present in the cache" 248 function cached_ohlcv!(s::Strategy) 249 eid = exchangeid(s) 250 met = ohlcvmethod!(s) 251 for ai in universe(s) 252 sym = raw(ai) 253 for (tf, data) in ohlcv_dict(ai) 254 cached_ohlcv!(eid, met, period(tf), sym; def=data) 255 end 256 end 257 end 258 259 function cached_ohlcv!(ai::AssetInstance, met=:candles) 260 eid = exchangeid(ai)() 261 sym = raw(ai) |> string 262 for (tf, data) in ohlcv_dict(ai) 263 cached_ohlcv!(eid, met, period(tf), sym; def=data) 264 end 265 end 266 267 if cached_ohlcv! ∉ st.STRATEGY_LOAD_CALLBACKS.live 268 push!(st.STRATEGY_LOAD_CALLBACKS.live, cached_ohlcv!) 269 end 270 271 function sourceohlcv!(s::RTStrategy, from_strat::Strategy) 272 # Override this strategy ohlcv data with ohlcv data from the source strategy 273 @info "ohlcv: sourcing from strategy" from_strat 274 this_timeframes = s.timeframes 275 for ai in universe(s) 276 fs_ai = asset_bysym(from_strat, raw(ai)) 277 if isnothing(fs_ai) 278 @warn "ohlcv: can't source for asset" ai from_strat 279 else 280 dict = ohlcv_dict(fs_ai) 281 for tf in this_timeframes 282 data = get(dict, tf, nothing) 283 if !isnothing(data) 284 ohlcv_dict(ai)[tf] = data 285 else 286 @warn "ohlcv: can't source for timeframe" ai tf from_strat 287 if !haskey(ohlcv_dict(ai), tf) 288 ohlcv_dict(ai)[tf] = Data.empty_ohlcv() 289 end 290 end 291 end 292 end 293 end 294 end 295 296 @doc """ 297 Make sure additional timeframes are updated (since source strategy only updates its own timeframes) 298 """ 299 function ensure_propagate!(s::RTStrategy, from_strat::Strategy) 300 this_timeframes = s.timeframes 301 diff_timeframes = setdiff(this_timeframes, from_strat.timeframes) 302 if !isempty(diff_timeframes) 303 @info "ohlcv: ensuring propagation for additional timeframes" from_strat diff_timeframes 304 w = ohlcv_watchers(from_strat) 305 if w isa Watcher 306 addcallback!(w, s, stack_propagate_ohlcv_callback(s, w.callback)) 307 elseif w isa Dict && valtype(w) <: Watcher 308 for ai in universe(s) 309 this_sym = raw(ai) 310 this_w = get(w, this_sym, nothing) 311 if !isnothing(this_w) 312 addpropagatetask!(this_w, s, ai) 313 else 314 @info "ohlcv: no source watcher for symbol" from_strat this_sym 315 end 316 end 317 end 318 end 319 end 320 321 function stack_propagate_ohlcv_callback(s, base_cb) 322 function propagate_ohlcv_callback(df, sym) 323 base_cb(df, sym) 324 asset_bysym(s, sym) |> ohlcv_dict |> Data.propagate_ohlcv! 325 end 326 end 327 328 function addcallback!(w::Watcher, s::RTStrategy, new_cb=nothing) 329 prev_cb = attr(w, :callback, nothing) 330 if isnothing(prev_cb) 331 @warn "watchers: no previous callback" w.name 332 end 333 w[:callback] = @something new_cb begin 334 stack_propagate_ohlcv_callback(s, prev_cb) 335 end 336 try 337 if isstarted(w) 338 stop!(w) 339 end 340 start!(w) 341 @info "watchers: subscribed to OHLCV updates" s 342 catch 343 @warn "watchers: callback update failed" w.name exception = (e, catch_backtrace()) 344 if !isstarted(w) 345 try 346 start!(w) 347 catch 348 end 349 end 350 end 351 end 352 353 function addpropagatetask!(w::Watcher, s::RTStrategy, ai::AssetInstance) 354 n = 1 355 this_sym = Symbol(:propagate_task, string(n)) 356 task::Union{Task,Nothing} = nothing 357 while true 358 task = attr(w, this_sym) 359 isnothing(task) && break 360 n += 1 361 this_sym = Symbol(:propagate_task, string(n)) 362 end 363 w[this_sym] = @async propagate_loop(s, ai, w) 364 end 365 366 export ohlcvmethod!, ohlcvmethod, sourceohlcv!, ensure_propagate!, addcallback!, stack_propagate_ohlcv_callback, addpropagatetask!