/ LiveMode / src / watchers / balance.jl
balance.jl
  1  using Watchers
  2  using Watchers: default_init
  3  using Watchers.WatchersImpls:
  4      _tfunc!,
  5      _tfunc,
  6      _exc!,
  7      _exc,
  8      _lastpushed!,
  9      _lastpushed,
 10      _lastprocessed!,
 11      _lastprocessed,
 12      _lastcount!,
 13      _lastcount
 14  @watcher_interface!
 15  using .Exchanges: check_timeout
 16  using .Exchanges.Python: @py
 17  using .Lang: splitkws, withoutkws, safenotify, safewait
 18  
 19  const CcxtBalanceVal = Val{:ccxt_balance_val}
 20  
 21  @doc """ Sets up a watcher for CCXT balance.
 22  
 23  $(TYPEDSIGNATURES)
 24  
 25  This function sets up a watcher for balance in the CCXT library. The watcher keeps track of the balance and updates it as necessary.
 26  """
 27  function ccxt_balance_watcher(
 28      s::Strategy;
 29      interval=Second(1),
 30      wid="ccxt_balance",
 31      buffer_capacity=10,
 32      start=false,
 33      params=LittleDict{Any,Any}(),
 34      kwargs...,
 35  )
 36      exc = st.exchange(s)
 37      check_timeout(exc, interval)
 38      attrs = Dict{Symbol,Any}()
 39      params["type"] = @pystr(lowercase(string(balance_type(s))))
 40      _exc!(attrs, exc)
 41      attrs[:strategy] = s
 42      attrs[:iswatch] = @lget! s.attrs :is_watch_balance has(exc, :watchBalance)
 43      attrs[:func_kwargs] = (; params, kwargs...)
 44      attrs[:interval] = interval
 45      watcher_type = Py
 46      wid = string(wid, "-", hash((exc.id, nameof(s), account(s))))
 47      watcher(
 48          watcher_type,
 49          wid,
 50          CcxtBalanceVal();
 51          start,
 52          load=false,
 53          flush=false,
 54          process=false,
 55          buffer_capacity,
 56          view_capacity=1,
 57          fetch_interval=interval,
 58          attrs,
 59      )
 60  end
 61  
 62  @doc """
 63  Returns true if the data is a dictionary and the event type matches BalanceUpdated.
 64  """
 65  function _balance_valid_event_data(eid, data)
 66      isdict(data) && resp_event_type(data, eid) == ot.BalanceUpdated
 67  end
 68  
 69  @doc """
 70  Checks if the data for the given date has already been processed.
 71  """
 72  function _balance_is_already_processed(w, data_date, data)
 73      data_date == _lastprocessed(w) && length(data) == _lastcount(w)
 74  end
 75  
 76  @doc """
 77  Returns true if the current date matches the view's date, and updates last processed.
 78  """
 79  function _balance_is_same_view_date(w, data_date, date)
 80      if date == w.view.date
 81          _lastprocessed!(w, data_date)
 82          return true
 83      end
 84      false
 85  end
 86  
 87  @doc """
 88  Returns the upper and lower case quote-currency symbols for the strategy.
 89  """
 90  function _balance_qc_syms(w, s)
 91      @lget! attrs(w) :qc_syms begin
 92          upper = nameof(cash(s))
 93          lower = string(upper) |> lowercase |> Symbol
 94          upper, lower
 95      end
 96  end
 97  
 98  @doc """
 99  Computes the free balance, falling back to total - assets_value if free is zero.
100  """
101  function _balance_compute_free(total, free, assets_value)
102      if iszero(free)
103          return max(zero(total), total - assets_value)
104      end
105      free
106  end
107  
108  @doc """
109  Computes used balance for quote-currency by summing unfilled order values.
110  """
111  function _balance_compute_used_for_qc!(used, s)
112      for o in orders(s)
113          if o isa IncreaseOrder
114              used += unfilled(o) * o.price
115          end
116      end
117      used
118  end
119  
120  @doc """
121  Computes used balance for a specific asset by summing unfilled reduce orders.
122  """
123  function _balance_compute_used_for_asset!(used, s, ai)
124      for o in orders(s, ai)
125          if o isa ReduceOrder
126              used += unfilled(o)
127          end
128      end
129      used
130  end
131  
132  @doc """
133  Computes the used balance for a symbol, using custom logic if value is zero.
134  """
135  function _balance_compute_used(sym_bal, isqc, s, symsdict, sym)
136      v = get_float(sym_bal, "used")
137      if iszero(v)
138          used = v
139          if isqc
140              return _balance_compute_used_for_qc!(used, s)
141          else
142              ai = asset_bysym(s, string(sym), symsdict)
143              if !isnothing(ai)
144                  return _balance_compute_used_for_asset!(used, s, ai)
145              end
146          end
147          return used
148      else
149          return v
150      end
151  end
152  
153  @doc """
154  Updates or creates a BalanceSnapshot for the given symbol and date.
155  """
156  function _balance_update_snapshot!(baldict, k, date, sym, total, free, used)
157      if haskey(baldict, k)
158          update!(baldict[k], date; total, free, used)
159          return baldict[k]
160      else
161          baldict[k] = BalanceSnapshot(; currency=sym, date, total, free, used)
162          return baldict[k]
163      end
164  end
165  
166  @doc """
167  Dispatches sync events for updated balances, depending on asset type.
168  """
169  function _balance_dispatch_events!(w, s, isqc, bal, sym, symsdict)
170      if isqc
171          s_events = get_events(s)
172          func = () -> _live_sync_strategy_cash!(s; bal)
173          sendrequest!(s, bal.date, func)
174      elseif s isa NoMarginStrategy
175          ai = asset_bysym(s, sym, symsdict)
176          if !isnothing(ai)
177              func = () -> _live_sync_cash!(s, ai; bal)
178              sendrequest!(ai, bal.date, func)
179          end
180      end
181      nothing
182  end
183  
184  @doc """
185  Processes and updates the balance for a single symbol.
186  """
187  function _balance_process_symbol!(
188      w, s, symsdict, baldict, qc_upper, qc_lower, sym, sym_bal, date, assets_value
189  )
190      if isdict(sym_bal) && haskey(sym_bal, @pyconst("free"))
191          k = Symbol(sym)
192          total = get_float(sym_bal, "total")
193          free = _balance_compute_free(total, get_float(sym_bal, "free"), assets_value)
194          isqc = k == qc_upper || k == qc_lower
195          used = _balance_compute_used(sym_bal, isqc, s, symsdict, sym)
196          bal = _balance_update_snapshot!(baldict, k, date, sym, total, free, used)
197          _balance_dispatch_events!(w, s, isqc, bal, sym, symsdict)
198      end
199      nothing
200  end
201  
202  @doc """
203  Initializes and returns the state for the balance watcher, including buffers and tasks.
204  """
205  function _balance_setup_state!(s, w, attrs)
206      exc = exchange(s)
207      timeout = throttle(s)
208      interval = attrs[:interval]
209      params, rest = _ccxt_balance_args(s, attrs[:func_kwargs])
210      buffer_size = attr(s, :live_buffer_size, 1000)
211      s[:balance_buffer] = w[:buf_process] = buf = Vector{Any}()
212      s[:balance_notify] = w[:buf_notify] = buf_notify = Condition()
213      sizehint!(buf, buffer_size)
214      tasks = w[:process_tasks] = Vector{Task}()
215      errors = w[:errors_count] = Ref(0)
216      (
217          s=s,
218          w=w,
219          attrs=attrs,
220          exc=exc,
221          timeout=timeout,
222          interval=interval,
223          params=params,
224          rest=rest,
225          buf=buf,
226          buf_notify=buf_notify,
227          tasks=tasks,
228          errors=errors,
229      )
230  end
231  
232  @doc """
233  Starts a background task to force fetch if the watcher stalls for too long.
234  """
235  function _balance_setup_stall_guard!(state)
236      s = state.s
237      w = state.w
238      attrs = state.attrs
239      if haskey(w, :stall_guard_task)
240          stop_task(w[:stall_guard_task])
241          delete!(w, :stall_guard_task)
242      end
243      w[:stall_guard_task] = @start_task IdDict() begin
244          while isstarted(w)
245              try
246                  last = _lastprocessed(w)
247                  if now() - last > Second(60)
248                      @warn "balance watcher: forcing fetch due to stall" last now() s
249                      _force_fetchbal(s; fallback_kwargs=attrs[:func_kwargs])
250                  end
251              catch e
252                  @warn "balance watcher: stall guard error" exception = e
253              end
254              sleep(10)
255          end
256          @debug "balance watcher: stall guard task stopped" _module = LogWatchBalProcess
257      end
258  end
259  
260  @doc """
261  Processes a new balance value, pushing it to the buffer and starting processing tasks.
262  """
263  function _balance_process_bal!(state, w, v)
264      if !isnothing(v)
265          if !isnothing(_dopush!(w, v; if_func=isdict))
266              push!(state.tasks, @async process!(w))
267              filter!(!istaskdone, state.tasks)
268          end
269      end
270      nothing
271  end
272  
273  @doc """
274  Initializes the balance watcher and its handler.
275  """
276  function _balance_init_watch!(state)
277      s = state.s
278      w = state.w
279      v = @lock w fetch_balance(s; state.timeout, state.params, state.rest...)
280      _balance_process_bal!(state, w, v)
281      state_init = Ref(false)
282      f_push(v) = begin
283          push!(state.buf, v)
284          notify(state.buf_notify)
285          maybe_backoff!(state.errors, v)
286      end
287      h =
288          w[:balance_handler] = watch_balance_handler(
289              state.exc; f_push, state.params, state.rest...
290          )
291      start_handler!(h)
292      state_init
293  end
294  
295  @doc """
296  Returns a closure that steps the balance watcher, initializing if needed.
297  """
298  function _balance_watch_closure(state)
299      init_ref = Ref(true)
300      function _balance_watch_do_init!()
301          if init_ref[]
302              _ = _balance_init_watch!(state)
303              init_ref[] = false
304          end
305          nothing
306      end
307      function balance_watch_step(w)
308          _balance_watch_do_init!()
309          while isempty(state.buf) && isstarted(w)
310              wait(state.buf_notify)
311          end
312          if !isempty(state.buf)
313              v = popfirst!(state.buf)
314              if v isa Exception
315                  @error "balance watcher: unexpected value" exception = v
316                  maybe_backoff!(state.errors, v)
317                  sleep(1)
318              else
319                  _balance_process_bal!(state, w, pydict(v))
320              end
321          end
322          nothing
323      end
324      balance_watch_step
325  end
326  
327  @doc """
328  Flushes the buffer, processing all pending balance values.
329  """
330  function _balance_flush_buf_notify!(state, w)
331      while !isempty(state.buf)
332          v = popfirst!(state.buf)
333          _dopush!(w, v)
334          push!(state.tasks, @async process!(w))
335          filter!(!istaskdone, state.tasks)
336      end
337  end
338  
339  @doc """
340  Returns a closure that fetches and processes balance updates periodically.
341  """
342  function _balance_fetch_closure(state)
343      s = state.s
344      function balance_fetch_step(w)
345          start = now()
346          try
347              _balance_flush_buf_notify!(state, w)
348              v = @lock w fetch_balance(s; state.timeout, state.params, state.rest...)
349              _dopush!(w, v; if_func=isdict)
350              push!(state.tasks, @async process!(w))
351              _balance_flush_buf_notify!(state, w)
352              filter!(!istaskdone, state.tasks)
353          finally
354              sleep_pad(start, state.interval)
355          end
356          nothing
357      end
358      balance_fetch_step
359  end
360  
361  @doc """
362  Returns the appropriate balance watcher function (watch or fetch) based on attrs.
363  """
364  function _w_balance_func(s, w, attrs)
365      state = _balance_setup_state!(s, w, attrs)
366      if attrs[:iswatch]
367          _balance_setup_stall_guard!(state)
368          return _balance_watch_closure(state)
369      else
370          return _balance_fetch_closure(state)
371      end
372  end
373  
374  @doc """
375  Starts the main balance watcher task for the watcher.
376  """
377  function _balance_task!(w)
378      f = _tfunc(w)
379      errors = w.errors_count
380      w[:balance_task] = (@async while isstarted(w)
381          try
382              f(w)
383              safenotify(w.beacon.fetch)
384          catch e
385              if e isa InterruptException
386                  break
387              else
388                  maybe_backoff!(errors, e)
389                  @debug_backtrace LogWatchBalance
390              end
391          end
392      end) |> errormonitor
393  end
394  
395  _balance_task(w) = @lget! attrs(w) :balance_task _balance_task!(w)
396  
397  function Watchers._stop!(w::Watcher, ::CcxtBalanceVal)
398      handler = attr(w, :balance_handler, nothing)
399      @debug "balance watcher: stopping handler" _module = LogWatchBalance handler
400      if !isnothing(handler)
401          stop_handler!(handler)
402      end
403      @debug "balance watcher: stopping task" _module = LogWatchBalance isstarted(w)
404      bt = attr(w, :balance_task, nothing)
405      if istaskrunning(bt)
406          stop_task(bt)
407      end
408      @debug "balance watcher: stopping stall guard task" _module = LogWatchBalance
409      if haskey(w, :stall_guard_task)
410          stop_task(w[:stall_guard_task])
411          delete!(w, :stall_guard_task)
412      end
413      @debug "balance watcher: notifying buffer" _module = LogWatchBalance
414      notify(w.buf_notify)
415      @debug "balance watcher: stopped" _module = LogWatchBalance
416      nothing
417  end
418  
419  function Watchers._fetch!(w::Watcher, ::CcxtBalanceVal)
420      fetch_task = _balance_task(w)
421      if !istaskrunning(fetch_task)
422          _balance_task!(w)
423      end
424      return true
425  end
426  
427  function _init!(w::Watcher, ::CcxtBalanceVal)
428      default_init(w, BalanceDict(), false)
429      _lastpushed!(w, DateTime(0))
430      _lastprocessed!(w, DateTime(0))
431      _lastcount!(w, ())
432  end
433  
434  @doc """ Processes balance for a watcher using the CCXT library.
435  
436  $(TYPEDSIGNATURES)
437  
438  This function processes balance for a watcher `w` using the CCXT library. It goes through the balance stored in the watcher and updates it based on the latest data from the exchange.
439  
440  """
441  function Watchers._process!(w::Watcher, ::CcxtBalanceVal; fetched=false)
442      # No-op if there is nothing new in the ring buffer
443      if isempty(w.buffer)
444          return nothing
445      end
446      # Read the last fetched event from the exchange and the current balance view
447      eid = typeof(exchangeid(_exc(w)))
448      data_date, data = last(w.buffer)
449      baldict = w.view.assets
450      if !_balance_valid_event_data(eid, data)
451          # Ignore unrelated/unexpected payloads but advance processed watermark
452          @debug "balance watcher: wrong data type" _module = LogWatchBalProcess data_date typeof(
453              data
454          )
455          _lastprocessed!(w, data_date)
456          _lastcount!(w, ())
457          return nothing
458      end
459      if _balance_is_already_processed(w, data_date, data)
460          # Skip if same payload already handled (idempotency)
461          @debug "balance watcher: already processed" _module = LogWatchBalProcess data_date
462          return nothing
463      end
464      # Use exchange provided timestamp if present, else fallback to now
465      date = @something pytodate(data, eid) now()
466      if _balance_is_same_view_date(w, data_date, date)
467          return nothing
468      end
469      # Strategy context and helpers for per-asset processing
470      s = w.strategy
471      symsdict = w.symsdict
472      # Compute current non-cash asset valuation to derive a conservative free amount when free==0
473      assets_value = current_total(s; bal=w.view) - s.cash
474      # Resolve quote-currency symbols once (upper/lower forms)
475      qc_upper, qc_lower = _balance_qc_syms(w, s)
476      # Update per-currency balances and dispatch sync events
477      for (sym, sym_bal) in data.items()
478          _balance_process_symbol!(
479              w, s, symsdict, baldict, qc_upper, qc_lower, sym, sym_bal, date, assets_value
480          )
481      end
482      # Commit view timestamp and watermarks
483      w.view.date = date
484      _lastprocessed!(w, data_date)
485      _lastcount!(w, data)
486      @debug "balance watcher data:" _module = LogWatchBalProcess date get(baldict, :BTC, nothing) 
487  end
488  
489  @doc """ Starts a watcher for balance in a live strategy.
490  
491  $(TYPEDSIGNATURES)
492  
493  This function starts a watcher for balance in a live strategy `s`. The watcher checks and updates the balance at a specified interval.
494  
495  """
496  function watch_balance!(s::LiveStrategy; interval=st.throttle(s), wait=false)
497      @debug "live: watch balance get" _module = LogWatchBalance islocked(s)
498      w = @lock s @lget! s :live_balance_watcher ccxt_balance_watcher(s; interval)
499      just_started = if isstopped(w) && !attr(s, :stopped, false)
500          @debug "live: locking" _module = LogWatchBalance
501          @lock w if isstopped(w)
502              @debug "live: start" _module = LogWatchBalance
503              start!(w)
504              @debug "live: started" _module = LogWatchBalance
505              true
506          else
507              @debug "live: already started" _module = LogWatchBalance
508              false
509          end
510      else
511          false
512      end
513      while wait && just_started && _lastprocessed(w) == DateTime(0)
514          @debug "live: waiting for initial balance" _module = LogWatchBalance
515          safewait(w.beacon.process)
516      end
517      @debug "live: balance watcher" _module = LogWatchBalance isstopped(w)
518      w
519  end
520  
521  @doc """ Stops the watcher for balance in a live strategy.
522  
523  $(TYPEDSIGNATURES)
524  
525  This function stops the watcher that is tracking and updating balance for a live strategy `s`.
526  
527  """
528  function stop_watch_balance!(s::LiveStrategy)
529      w = get(s.attrs, :live_balance_watcher, nothing)
530      if w isa Watcher
531          @debug "live: stopping balance watcher" _module = LogWatchBalance islocked(w)
532          if isstarted(w)
533              stop!(w)
534          end
535          @debug "live: balance watcher stopped" _module = LogWatchBalance
536      end
537  end
538  
539  @doc """ Retrieves the balance watcher for a live strategy.
540  
541  $(TYPEDSIGNATURES)
542  """
543  balance_watcher(s) = s[:live_balance_watcher]
544  
545  # function _load!(w::Watcher, ::CcxtBalanceVal) end
546  
547  # function _process!(w::Watcher, ::CcxtBalanceVal) end
548  
549  function _start!(w::Watcher, ::CcxtBalanceVal)
550      _lastprocessed!(w, DateTime(0))
551      attrs = w.attrs
552      view = attrs[:view]
553      reset!(view)
554      s = attrs[:strategy]
555      w[:symsdict] = symsdict(s)
556      exc = exchange(s)
557      _exc!(attrs, exc)
558      _tfunc!(attrs, _w_balance_func(s, w, attrs))
559  end