balance.jl
1 using Watchers 2 using Watchers: default_init 3 using Watchers.WatchersImpls: 4 _tfunc!, 5 _tfunc, 6 _exc!, 7 _exc, 8 _lastpushed!, 9 _lastpushed, 10 _lastprocessed!, 11 _lastprocessed, 12 _lastcount!, 13 _lastcount 14 @watcher_interface! 15 using .Exchanges: check_timeout 16 using .Exchanges.Python: @py 17 using .Lang: splitkws, withoutkws, safenotify, safewait 18 19 const CcxtBalanceVal = Val{:ccxt_balance_val} 20 21 @doc """ Sets up a watcher for CCXT balance. 22 23 $(TYPEDSIGNATURES) 24 25 This function sets up a watcher for balance in the CCXT library. The watcher keeps track of the balance and updates it as necessary. 26 """ 27 function ccxt_balance_watcher( 28 s::Strategy; 29 interval=Second(1), 30 wid="ccxt_balance", 31 buffer_capacity=10, 32 start=false, 33 params=LittleDict{Any,Any}(), 34 kwargs..., 35 ) 36 exc = st.exchange(s) 37 check_timeout(exc, interval) 38 attrs = Dict{Symbol,Any}() 39 params["type"] = @pystr(lowercase(string(balance_type(s)))) 40 _exc!(attrs, exc) 41 attrs[:strategy] = s 42 attrs[:iswatch] = @lget! s.attrs :is_watch_balance has(exc, :watchBalance) 43 attrs[:func_kwargs] = (; params, kwargs...) 44 attrs[:interval] = interval 45 watcher_type = Py 46 wid = string(wid, "-", hash((exc.id, nameof(s), account(s)))) 47 watcher( 48 watcher_type, 49 wid, 50 CcxtBalanceVal(); 51 start, 52 load=false, 53 flush=false, 54 process=false, 55 buffer_capacity, 56 view_capacity=1, 57 fetch_interval=interval, 58 attrs, 59 ) 60 end 61 62 @doc """ 63 Returns true if the data is a dictionary and the event type matches BalanceUpdated. 64 """ 65 function _balance_valid_event_data(eid, data) 66 isdict(data) && resp_event_type(data, eid) == ot.BalanceUpdated 67 end 68 69 @doc """ 70 Checks if the data for the given date has already been processed. 71 """ 72 function _balance_is_already_processed(w, data_date, data) 73 data_date == _lastprocessed(w) && length(data) == _lastcount(w) 74 end 75 76 @doc """ 77 Returns true if the current date matches the view's date, and updates last processed. 78 """ 79 function _balance_is_same_view_date(w, data_date, date) 80 if date == w.view.date 81 _lastprocessed!(w, data_date) 82 return true 83 end 84 false 85 end 86 87 @doc """ 88 Returns the upper and lower case quote-currency symbols for the strategy. 89 """ 90 function _balance_qc_syms(w, s) 91 @lget! attrs(w) :qc_syms begin 92 upper = nameof(cash(s)) 93 lower = string(upper) |> lowercase |> Symbol 94 upper, lower 95 end 96 end 97 98 @doc """ 99 Computes the free balance, falling back to total - assets_value if free is zero. 100 """ 101 function _balance_compute_free(total, free, assets_value) 102 if iszero(free) 103 return max(zero(total), total - assets_value) 104 end 105 free 106 end 107 108 @doc """ 109 Computes used balance for quote-currency by summing unfilled order values. 110 """ 111 function _balance_compute_used_for_qc!(used, s) 112 for o in orders(s) 113 if o isa IncreaseOrder 114 used += unfilled(o) * o.price 115 end 116 end 117 used 118 end 119 120 @doc """ 121 Computes used balance for a specific asset by summing unfilled reduce orders. 122 """ 123 function _balance_compute_used_for_asset!(used, s, ai) 124 for o in orders(s, ai) 125 if o isa ReduceOrder 126 used += unfilled(o) 127 end 128 end 129 used 130 end 131 132 @doc """ 133 Computes the used balance for a symbol, using custom logic if value is zero. 134 """ 135 function _balance_compute_used(sym_bal, isqc, s, symsdict, sym) 136 v = get_float(sym_bal, "used") 137 if iszero(v) 138 used = v 139 if isqc 140 return _balance_compute_used_for_qc!(used, s) 141 else 142 ai = asset_bysym(s, string(sym), symsdict) 143 if !isnothing(ai) 144 return _balance_compute_used_for_asset!(used, s, ai) 145 end 146 end 147 return used 148 else 149 return v 150 end 151 end 152 153 @doc """ 154 Updates or creates a BalanceSnapshot for the given symbol and date. 155 """ 156 function _balance_update_snapshot!(baldict, k, date, sym, total, free, used) 157 if haskey(baldict, k) 158 update!(baldict[k], date; total, free, used) 159 return baldict[k] 160 else 161 baldict[k] = BalanceSnapshot(; currency=sym, date, total, free, used) 162 return baldict[k] 163 end 164 end 165 166 @doc """ 167 Dispatches sync events for updated balances, depending on asset type. 168 """ 169 function _balance_dispatch_events!(w, s, isqc, bal, sym, symsdict) 170 if isqc 171 s_events = get_events(s) 172 func = () -> _live_sync_strategy_cash!(s; bal) 173 sendrequest!(s, bal.date, func) 174 elseif s isa NoMarginStrategy 175 ai = asset_bysym(s, sym, symsdict) 176 if !isnothing(ai) 177 func = () -> _live_sync_cash!(s, ai; bal) 178 sendrequest!(ai, bal.date, func) 179 end 180 end 181 nothing 182 end 183 184 @doc """ 185 Processes and updates the balance for a single symbol. 186 """ 187 function _balance_process_symbol!( 188 w, s, symsdict, baldict, qc_upper, qc_lower, sym, sym_bal, date, assets_value 189 ) 190 if isdict(sym_bal) && haskey(sym_bal, @pyconst("free")) 191 k = Symbol(sym) 192 total = get_float(sym_bal, "total") 193 free = _balance_compute_free(total, get_float(sym_bal, "free"), assets_value) 194 isqc = k == qc_upper || k == qc_lower 195 used = _balance_compute_used(sym_bal, isqc, s, symsdict, sym) 196 bal = _balance_update_snapshot!(baldict, k, date, sym, total, free, used) 197 _balance_dispatch_events!(w, s, isqc, bal, sym, symsdict) 198 end 199 nothing 200 end 201 202 @doc """ 203 Initializes and returns the state for the balance watcher, including buffers and tasks. 204 """ 205 function _balance_setup_state!(s, w, attrs) 206 exc = exchange(s) 207 timeout = throttle(s) 208 interval = attrs[:interval] 209 params, rest = _ccxt_balance_args(s, attrs[:func_kwargs]) 210 buffer_size = attr(s, :live_buffer_size, 1000) 211 s[:balance_buffer] = w[:buf_process] = buf = Vector{Any}() 212 s[:balance_notify] = w[:buf_notify] = buf_notify = Condition() 213 sizehint!(buf, buffer_size) 214 tasks = w[:process_tasks] = Vector{Task}() 215 errors = w[:errors_count] = Ref(0) 216 ( 217 s=s, 218 w=w, 219 attrs=attrs, 220 exc=exc, 221 timeout=timeout, 222 interval=interval, 223 params=params, 224 rest=rest, 225 buf=buf, 226 buf_notify=buf_notify, 227 tasks=tasks, 228 errors=errors, 229 ) 230 end 231 232 @doc """ 233 Starts a background task to force fetch if the watcher stalls for too long. 234 """ 235 function _balance_setup_stall_guard!(state) 236 s = state.s 237 w = state.w 238 attrs = state.attrs 239 if haskey(w, :stall_guard_task) 240 stop_task(w[:stall_guard_task]) 241 delete!(w, :stall_guard_task) 242 end 243 w[:stall_guard_task] = @start_task IdDict() begin 244 while isstarted(w) 245 try 246 last = _lastprocessed(w) 247 if now() - last > Second(60) 248 @warn "balance watcher: forcing fetch due to stall" last now() s 249 _force_fetchbal(s; fallback_kwargs=attrs[:func_kwargs]) 250 end 251 catch e 252 @warn "balance watcher: stall guard error" exception = e 253 end 254 sleep(10) 255 end 256 @debug "balance watcher: stall guard task stopped" _module = LogWatchBalProcess 257 end 258 end 259 260 @doc """ 261 Processes a new balance value, pushing it to the buffer and starting processing tasks. 262 """ 263 function _balance_process_bal!(state, w, v) 264 if !isnothing(v) 265 if !isnothing(_dopush!(w, v; if_func=isdict)) 266 push!(state.tasks, @async process!(w)) 267 filter!(!istaskdone, state.tasks) 268 end 269 end 270 nothing 271 end 272 273 @doc """ 274 Initializes the balance watcher and its handler. 275 """ 276 function _balance_init_watch!(state) 277 s = state.s 278 w = state.w 279 v = @lock w fetch_balance(s; state.timeout, state.params, state.rest...) 280 _balance_process_bal!(state, w, v) 281 state_init = Ref(false) 282 f_push(v) = begin 283 push!(state.buf, v) 284 notify(state.buf_notify) 285 maybe_backoff!(state.errors, v) 286 end 287 h = 288 w[:balance_handler] = watch_balance_handler( 289 state.exc; f_push, state.params, state.rest... 290 ) 291 start_handler!(h) 292 state_init 293 end 294 295 @doc """ 296 Returns a closure that steps the balance watcher, initializing if needed. 297 """ 298 function _balance_watch_closure(state) 299 init_ref = Ref(true) 300 function _balance_watch_do_init!() 301 if init_ref[] 302 _ = _balance_init_watch!(state) 303 init_ref[] = false 304 end 305 nothing 306 end 307 function balance_watch_step(w) 308 _balance_watch_do_init!() 309 while isempty(state.buf) && isstarted(w) 310 wait(state.buf_notify) 311 end 312 if !isempty(state.buf) 313 v = popfirst!(state.buf) 314 if v isa Exception 315 @error "balance watcher: unexpected value" exception = v 316 maybe_backoff!(state.errors, v) 317 sleep(1) 318 else 319 _balance_process_bal!(state, w, pydict(v)) 320 end 321 end 322 nothing 323 end 324 balance_watch_step 325 end 326 327 @doc """ 328 Flushes the buffer, processing all pending balance values. 329 """ 330 function _balance_flush_buf_notify!(state, w) 331 while !isempty(state.buf) 332 v = popfirst!(state.buf) 333 _dopush!(w, v) 334 push!(state.tasks, @async process!(w)) 335 filter!(!istaskdone, state.tasks) 336 end 337 end 338 339 @doc """ 340 Returns a closure that fetches and processes balance updates periodically. 341 """ 342 function _balance_fetch_closure(state) 343 s = state.s 344 function balance_fetch_step(w) 345 start = now() 346 try 347 _balance_flush_buf_notify!(state, w) 348 v = @lock w fetch_balance(s; state.timeout, state.params, state.rest...) 349 _dopush!(w, v; if_func=isdict) 350 push!(state.tasks, @async process!(w)) 351 _balance_flush_buf_notify!(state, w) 352 filter!(!istaskdone, state.tasks) 353 finally 354 sleep_pad(start, state.interval) 355 end 356 nothing 357 end 358 balance_fetch_step 359 end 360 361 @doc """ 362 Returns the appropriate balance watcher function (watch or fetch) based on attrs. 363 """ 364 function _w_balance_func(s, w, attrs) 365 state = _balance_setup_state!(s, w, attrs) 366 if attrs[:iswatch] 367 _balance_setup_stall_guard!(state) 368 return _balance_watch_closure(state) 369 else 370 return _balance_fetch_closure(state) 371 end 372 end 373 374 @doc """ 375 Starts the main balance watcher task for the watcher. 376 """ 377 function _balance_task!(w) 378 f = _tfunc(w) 379 errors = w.errors_count 380 w[:balance_task] = (@async while isstarted(w) 381 try 382 f(w) 383 safenotify(w.beacon.fetch) 384 catch e 385 if e isa InterruptException 386 break 387 else 388 maybe_backoff!(errors, e) 389 @debug_backtrace LogWatchBalance 390 end 391 end 392 end) |> errormonitor 393 end 394 395 _balance_task(w) = @lget! attrs(w) :balance_task _balance_task!(w) 396 397 function Watchers._stop!(w::Watcher, ::CcxtBalanceVal) 398 handler = attr(w, :balance_handler, nothing) 399 @debug "balance watcher: stopping handler" _module = LogWatchBalance handler 400 if !isnothing(handler) 401 stop_handler!(handler) 402 end 403 @debug "balance watcher: stopping task" _module = LogWatchBalance isstarted(w) 404 bt = attr(w, :balance_task, nothing) 405 if istaskrunning(bt) 406 stop_task(bt) 407 end 408 @debug "balance watcher: stopping stall guard task" _module = LogWatchBalance 409 if haskey(w, :stall_guard_task) 410 stop_task(w[:stall_guard_task]) 411 delete!(w, :stall_guard_task) 412 end 413 @debug "balance watcher: notifying buffer" _module = LogWatchBalance 414 notify(w.buf_notify) 415 @debug "balance watcher: stopped" _module = LogWatchBalance 416 nothing 417 end 418 419 function Watchers._fetch!(w::Watcher, ::CcxtBalanceVal) 420 fetch_task = _balance_task(w) 421 if !istaskrunning(fetch_task) 422 _balance_task!(w) 423 end 424 return true 425 end 426 427 function _init!(w::Watcher, ::CcxtBalanceVal) 428 default_init(w, BalanceDict(), false) 429 _lastpushed!(w, DateTime(0)) 430 _lastprocessed!(w, DateTime(0)) 431 _lastcount!(w, ()) 432 end 433 434 @doc """ Processes balance for a watcher using the CCXT library. 435 436 $(TYPEDSIGNATURES) 437 438 This function processes balance for a watcher `w` using the CCXT library. It goes through the balance stored in the watcher and updates it based on the latest data from the exchange. 439 440 """ 441 function Watchers._process!(w::Watcher, ::CcxtBalanceVal; fetched=false) 442 # No-op if there is nothing new in the ring buffer 443 if isempty(w.buffer) 444 return nothing 445 end 446 # Read the last fetched event from the exchange and the current balance view 447 eid = typeof(exchangeid(_exc(w))) 448 data_date, data = last(w.buffer) 449 baldict = w.view.assets 450 if !_balance_valid_event_data(eid, data) 451 # Ignore unrelated/unexpected payloads but advance processed watermark 452 @debug "balance watcher: wrong data type" _module = LogWatchBalProcess data_date typeof( 453 data 454 ) 455 _lastprocessed!(w, data_date) 456 _lastcount!(w, ()) 457 return nothing 458 end 459 if _balance_is_already_processed(w, data_date, data) 460 # Skip if same payload already handled (idempotency) 461 @debug "balance watcher: already processed" _module = LogWatchBalProcess data_date 462 return nothing 463 end 464 # Use exchange provided timestamp if present, else fallback to now 465 date = @something pytodate(data, eid) now() 466 if _balance_is_same_view_date(w, data_date, date) 467 return nothing 468 end 469 # Strategy context and helpers for per-asset processing 470 s = w.strategy 471 symsdict = w.symsdict 472 # Compute current non-cash asset valuation to derive a conservative free amount when free==0 473 assets_value = current_total(s; bal=w.view) - s.cash 474 # Resolve quote-currency symbols once (upper/lower forms) 475 qc_upper, qc_lower = _balance_qc_syms(w, s) 476 # Update per-currency balances and dispatch sync events 477 for (sym, sym_bal) in data.items() 478 _balance_process_symbol!( 479 w, s, symsdict, baldict, qc_upper, qc_lower, sym, sym_bal, date, assets_value 480 ) 481 end 482 # Commit view timestamp and watermarks 483 w.view.date = date 484 _lastprocessed!(w, data_date) 485 _lastcount!(w, data) 486 @debug "balance watcher data:" _module = LogWatchBalProcess date get(baldict, :BTC, nothing) 487 end 488 489 @doc """ Starts a watcher for balance in a live strategy. 490 491 $(TYPEDSIGNATURES) 492 493 This function starts a watcher for balance in a live strategy `s`. The watcher checks and updates the balance at a specified interval. 494 495 """ 496 function watch_balance!(s::LiveStrategy; interval=st.throttle(s), wait=false) 497 @debug "live: watch balance get" _module = LogWatchBalance islocked(s) 498 w = @lock s @lget! s :live_balance_watcher ccxt_balance_watcher(s; interval) 499 just_started = if isstopped(w) && !attr(s, :stopped, false) 500 @debug "live: locking" _module = LogWatchBalance 501 @lock w if isstopped(w) 502 @debug "live: start" _module = LogWatchBalance 503 start!(w) 504 @debug "live: started" _module = LogWatchBalance 505 true 506 else 507 @debug "live: already started" _module = LogWatchBalance 508 false 509 end 510 else 511 false 512 end 513 while wait && just_started && _lastprocessed(w) == DateTime(0) 514 @debug "live: waiting for initial balance" _module = LogWatchBalance 515 safewait(w.beacon.process) 516 end 517 @debug "live: balance watcher" _module = LogWatchBalance isstopped(w) 518 w 519 end 520 521 @doc """ Stops the watcher for balance in a live strategy. 522 523 $(TYPEDSIGNATURES) 524 525 This function stops the watcher that is tracking and updating balance for a live strategy `s`. 526 527 """ 528 function stop_watch_balance!(s::LiveStrategy) 529 w = get(s.attrs, :live_balance_watcher, nothing) 530 if w isa Watcher 531 @debug "live: stopping balance watcher" _module = LogWatchBalance islocked(w) 532 if isstarted(w) 533 stop!(w) 534 end 535 @debug "live: balance watcher stopped" _module = LogWatchBalance 536 end 537 end 538 539 @doc """ Retrieves the balance watcher for a live strategy. 540 541 $(TYPEDSIGNATURES) 542 """ 543 balance_watcher(s) = s[:live_balance_watcher] 544 545 # function _load!(w::Watcher, ::CcxtBalanceVal) end 546 547 # function _process!(w::Watcher, ::CcxtBalanceVal) end 548 549 function _start!(w::Watcher, ::CcxtBalanceVal) 550 _lastprocessed!(w, DateTime(0)) 551 attrs = w.attrs 552 view = attrs[:view] 553 reset!(view) 554 s = attrs[:strategy] 555 w[:symsdict] = symsdict(s) 556 exc = exchange(s) 557 _exc!(attrs, exc) 558 _tfunc!(attrs, _w_balance_func(s, w, attrs)) 559 end