/ LiveMode / src / watchers / ohlcv.jl
ohlcv.jl
  1  using Watchers.WatchersImpls:
  2      ccxt_ohlcv_watcher, ccxt_ohlcv_tickers_watcher, ccxt_ohlcv_candles_watcher, ccxt_average_ohlcv_watcher
  3  import Watchers.WatchersImpls: cached_ohlcv!
  4  using .st: logpath
  5  using .Data: DataFrame, propagate_ohlcv!
  6  
  7  @doc """ Continuously propagates OHLCV data.
  8  
  9  $(TYPEDSIGNATURES)
 10  
 11  This function continuously propagates OHLCV (Open, High, Low, Close, Volume) data for a given watcher `w` and ai `ai`.
 12  It enters an infinite loop where it safely waits for a process in the watcher, then checks if the watcher is stopped.
 13  If the watcher is not stopped, it tries to propagate the OHLCV data.
 14  
 15  """
 16  propagate_loop(s::RTStrategy, ai, w::Watcher) = begin
 17      data = ai.data
 18      met = ohlcvmethod(s)
 19      try
 20          while true
 21              safewait(w.beacon.process)
 22              try
 23                  propagate_ohlcv!(data)
 24                  cached_ohlcv!(ai, met)
 25              catch exception
 26                  @debug "watchers: propagate loop" exception
 27              end
 28          end
 29      catch
 30          @warn "watchers: propagate loop stopped" ai = raw(ai)
 31      end
 32  end
 33  
 34  @doc """ Continuously propagates OHLCV data for each asset in the universe.
 35  
 36  $(TYPEDSIGNATURES)
 37  
 38  This function continuously propagates OHLCV (Open, High, Low, Close, Volume) data for each asset in a strategy's universe.
 39  It enters an infinite loop where it safely waits for a process in the watcher, then checks if the watcher is stopped.
 40  If the watcher is not stopped, it tries to propagate the OHLCV data for each asset in the strategy's universe.
 41  
 42  """
 43  propagate_loop(s::RTStrategy, w::Watcher) = begin
 44      try
 45          while true
 46              safewait(w.beacon.process)
 47              for ai in s.universe
 48                  try
 49                      propagate_ohlcv!(ai.data)
 50                  catch exception
 51                      @debug "watchers: propagate loop" exception
 52                  end
 53              end
 54          end
 55      catch
 56          @warn "watchers: propagate loop stopped" s = nameof(s)
 57      end
 58  end
 59  
 60  @doc """ Returns the OHLCV method for the strategy.
 61  
 62  $(TYPEDSIGNATURES)
 63  
 64  Fetches the `live_ohlcv_method` attribute from the strategy `s`.
 65  Returns the method used for generating OHLCV data in the strategy.
 66  values: `tickers`, `trades`, `candles`
 67  
 68  """
 69  ohlcvmethod(s::Strategy) = attr(s, :live_ohlcv_method, :candles)
 70  ohlcvmethod(ai::AssetInstance) = attr(ai, :live_ohlcv_method, :candles)
 71  function ohlcvmethod!(s::Strategy, m=nothing)
 72      if m ∉ (:tickers, :candles, :trades, :average, nothing)
 73          error("ohlcv methods supported are `tickers`, `candles`, `trades` and `average`")
 74      end
 75      k = :live_ohlcv_method
 76      setfunc = isnothing(m) ? (d, _) -> attr!(d, k, :candles) : (d, m) -> setattr!(d, m, k)
 77      setfunc(s, m)
 78      for ai in universe(s)
 79          setfunc(ai, m)
 80      end
 81      @something m :tickers
 82  end
 83  
 84  @doc """ Returns the watchers for OHLCV data.
 85  
 86  $(TYPEDSIGNATURES)
 87  
 88  Determines if OHLCV data should be generated by trades.
 89  If so, it returns the dictionary of watchers for each asset instance.
 90  Otherwise, it returns the single watcher for the strategy `s`.
 91  
 92  """
 93  function ohlcv_watchers(s::RTStrategy)
 94      if ohlcvmethod!(s) == :trades
 95          @lget! s.attrs :live_ohlcv_watchers Dict{AssetInstance,Watcher}()
 96      else
 97          attr(s, :live_ohlcv_watcher, nothing)
 98      end
 99  end
100  
101  @doc """ Watches and propagates OHLCV data.
102  
103  $(TYPEDSIGNATURES)
104  
105  This function starts watchers for OHLCV (Open, High, Low, Close, Volume) data based on the strategy's universe and whether OHLCV data should be generated by trades.
106  For each asset in the universe, it starts a watcher that propagates OHLCV data.
107  If OHLCV data should not be generated by trades, it starts a single watcher for all assets in the universe.
108  
109  """
110  function watch_ohlcv!(s::RTStrategy; exc=exchange(s), kwargs...)
111      ow = ohlcv_watchers(s)
112      met = ohlcvmethod(s)
113      if met == :trades
114          function start_watcher(ai)
115              # NOTE: define it as local, otherwise async would clobber it before
116              # being saved in the dict
117              local w
118              sym = raw(ai)
119              eid = exchangeid(exc)
120              default_view = @lget! ai.data s.timeframe cached_ohlcv!(
121                  eid, met, s.timeframe, sym
122              )
123              prev_w = get(ow, ai, missing)
124              if !ismissing(prev_w)
125                  if isrunning(prev_w)
126                      close(prev_w)
127                  end
128              end
129              w = ccxt_ohlcv_watcher(exc, sym; s.timeframe, default_view)
130              Watchers.load!(w)
131              w[:propagate_task] = @async propagate_loop(s, ai, w)
132              start!(w)
133              ow[ai] = w
134          end
135          @sync for ai in s.universe
136              @async start_watcher(ai)
137          end
138      else
139          eid = exchangeid(s)
140          default_view = Dict{String,DataFrame}(
141              raw(ai) =>
142                  @lget!(ai.data, s.timeframe, cached_ohlcv!(eid, met, s.timeframe, raw(ai)))
143              for ai in s.universe
144          )
145          buffer_capacity = attr(s, :live_buffer_capacity, 100)
146          view_capacity = attr(
147              s, :live_view_capacity, count(s.timeframe, tf"1d") + 1 + buffer_capacity
148          )
149          n_jobs = attr(s, :live_ohlcv_jobs, 8)
150          function propagate_callback(_, sym)
151              @debug "watchers: propagating" _module = LogWatchOHLCV sym
152              ai = asset_bysym(s, sym)
153              ohlcv_dict(ai) |> propagate_ohlcv!
154              cached_ohlcv!(ai, met)
155          end
156          watcher_func = if met == :tickers
157              (exc; kwargs...) -> ccxt_ohlcv_tickers_watcher(exc; kwargs...)
158          elseif met == :candles
159              (exc; syms, kwargs...) -> ccxt_ohlcv_candles_watcher(exc, syms; kwargs...)
160          elseif met == :average
161              exchanges = attr(s, :watcher_exchanges, nothing)
162              if exchanges isa Vector{Symbol} || exchanges isa Vector{String}
163                  exchanges = getexchange!.(Symbol.(exchanges); sandbox=issandbox(exc))
164              elseif !(exchanges isa Vector{Exchange})
165                  @warn "watcher_exchanges is not a Vector{Exchange}, Symbol, or String; defaulting to strategy exchange" exchanges
166                  exchanges = Exchange[exc]
167              end
168              input_source = attr(s, :watcher_ohlcvmethod, nothing)
169              valid_sources = (:tickers, :candles, :trades)
170              if input_source ∉ valid_sources
171                  @warn "Invalid watcher_ohlcvmethod for average ohlcv method, defaulting to :tickers" input_source
172                  input_source = :tickers
173              end
174              (exc; syms, kwargs...) -> ccxt_average_ohlcv_watcher(
175                  exchanges, syms;
176                  timeframe=timeframe,
177                  input_source=input_source,
178                  kwargs...
179              )
180          else
181              error("call: invalid ohlcv method $met")
182          end
183          load_path = attr(s, :watcher_load_path, dirname(s.path))
184          s[:live_ohlcv_watcher] =
185              w = watcher_func(
186                  exc;
187                  timeframe=s.timeframe,
188                  syms=(raw(ai) for ai in s.universe),
189                  flush=false,
190                  logfile=logpath(s; name="ohlcv_watcher_$(nameof(s))"),
191                  load_path=load_path,
192                  buffer_capacity,
193                  view_capacity,
194                  n_jobs,
195                  default_view,
196                  callback=propagate_callback,
197              )
198          w[:quiet] = true
199          w[:resync_noncontig] = true
200          w[:startup_task] = @async begin
201              wv = w.view
202              @sync for ai in s.universe
203                  sym = raw(ai)
204                  wv[sym] = ai.data[s.timeframe]
205                  @async Watchers.load!(w, sym)
206              end
207          end
208          start!(w)
209      end
210  end
211  
212  @doc """ Stops watching OHLCV data.
213  
214  $(TYPEDSIGNATURES)
215  
216  This function stops the watchers that were started to propagate OHLCV (Open, High, Low, Close, Volume) data based on the strategy's universe.
217  If the watcher is a single instance, it stops the watcher.
218  If the watcher is a dictionary of watchers, it stops each watcher in the dictionary.
219  
220  """
221  function stop_watch_ohlcv!(s::RTStrategy)
222      w = ohlcv_watchers(s)
223      isnothing(w) && return nothing
224      if w isa Watcher
225          if isrunning(w)
226              stop!(w)
227          end
228      elseif valtype(w) <: Watcher
229          for ai_w in values(w)
230              if isrunning(ai_w)
231                  stop!(ai_w)
232              end
233          end
234      else
235          error()
236      end
237  end
238  
239  function empty_ohlcv(s::Strategy, ai::AssetInstance)
240      cached_ohlcv!(exchangeid(s), ohlcvmethod(s), period(s.timeframe), raw(ai))
241  end
242  
243  function empty_ohlcv(ai::AssetInstance, tf::TimeFrame; met=:candles)
244      cached_ohlcv!(exchangeid(ai), ohlcvmethod(ai), period(tf), raw(ai))
245  end
246  
247  @doc "Ensures dataframes in the strategy are present in the cache"
248  function cached_ohlcv!(s::Strategy)
249      eid = exchangeid(s)
250      met = ohlcvmethod!(s)
251      for ai in universe(s)
252          sym = raw(ai)
253          for (tf, data) in ohlcv_dict(ai)
254              cached_ohlcv!(eid, met, period(tf), sym; def=data)
255          end
256      end
257  end
258  
259  function cached_ohlcv!(ai::AssetInstance, met=:candles)
260      eid = exchangeid(ai)()
261      sym = raw(ai) |> string
262      for (tf, data) in ohlcv_dict(ai)
263          cached_ohlcv!(eid, met, period(tf), sym; def=data)
264      end
265  end
266  
267  if cached_ohlcv! ∉ st.STRATEGY_LOAD_CALLBACKS.live
268      push!(st.STRATEGY_LOAD_CALLBACKS.live, cached_ohlcv!)
269  end
270  
271  function sourceohlcv!(s::RTStrategy, from_strat::Strategy)
272      # Override this strategy ohlcv data with ohlcv data from the source strategy
273      @info "ohlcv: sourcing from strategy" from_strat
274      this_timeframes = s.timeframes
275      for ai in universe(s)
276          fs_ai = asset_bysym(from_strat, raw(ai))
277          if isnothing(fs_ai)
278              @warn "ohlcv: can't source for asset" ai from_strat
279          else
280              dict = ohlcv_dict(fs_ai)
281              for tf in this_timeframes
282                  data = get(dict, tf, nothing)
283                  if !isnothing(data)
284                      ohlcv_dict(ai)[tf] = data
285                  else
286                      @warn "ohlcv: can't source for timeframe" ai tf from_strat
287                      if !haskey(ohlcv_dict(ai), tf)
288                          ohlcv_dict(ai)[tf] = Data.empty_ohlcv()
289                      end
290                  end
291              end
292          end
293      end
294  end
295  
296  @doc """
297  Make sure additional timeframes are updated (since source strategy only updates its own timeframes)
298  """
299  function ensure_propagate!(s::RTStrategy, from_strat::Strategy)
300      this_timeframes = s.timeframes
301      diff_timeframes = setdiff(this_timeframes, from_strat.timeframes)
302      if !isempty(diff_timeframes)
303          @info "ohlcv: ensuring propagation for additional timeframes" from_strat diff_timeframes
304          w = ohlcv_watchers(from_strat)
305          if w isa Watcher
306              addcallback!(w, s, stack_propagate_ohlcv_callback(s, w.callback))
307          elseif w isa Dict && valtype(w) <: Watcher
308              for ai in universe(s)
309                  this_sym = raw(ai)
310                  this_w = get(w, this_sym, nothing)
311                  if !isnothing(this_w)
312                      addpropagatetask!(this_w, s, ai)
313                  else
314                      @info "ohlcv: no source watcher for symbol" from_strat this_sym
315                  end
316              end
317          end
318      end
319  end
320  
321  function stack_propagate_ohlcv_callback(s, base_cb)
322      function propagate_ohlcv_callback(df, sym)
323          base_cb(df, sym)
324          asset_bysym(s, sym) |> ohlcv_dict |> Data.propagate_ohlcv!
325      end
326  end
327  
328  function addcallback!(w::Watcher, s::RTStrategy, new_cb=nothing)
329      prev_cb = attr(w, :callback, nothing)
330      if isnothing(prev_cb)
331          @warn "watchers: no previous callback" w.name
332      end
333      w[:callback] = @something new_cb begin
334          stack_propagate_ohlcv_callback(s, prev_cb)
335      end
336      try
337          if isstarted(w)
338              stop!(w)
339          end
340          start!(w)
341          @info "watchers: subscribed to OHLCV updates" s
342      catch
343          @warn "watchers: callback update failed" w.name exception = (e, catch_backtrace())
344          if !isstarted(w)
345              try
346                  start!(w)
347              catch
348              end
349          end
350      end
351  end
352  
353  function addpropagatetask!(w::Watcher, s::RTStrategy, ai::AssetInstance)
354      n = 1
355      this_sym = Symbol(:propagate_task, string(n))
356      task::Union{Task,Nothing} = nothing
357      while true
358          task = attr(w, this_sym)
359          isnothing(task) && break
360          n += 1
361          this_sym = Symbol(:propagate_task, string(n))
362      end
363      w[this_sym] = @async propagate_loop(s, ai, w)
364  end
365  
366  export ohlcvmethod!, ohlcvmethod, sourceohlcv!, ensure_propagate!, addcallback!, stack_propagate_ohlcv_callback, addpropagatetask!