caching.jl
1 using LRUCache: LRUCache 2 using .Misc.TimeToLive: ConcurrentDict 3 4 function cache_keys() 5 (:trades_cache, :open_orders_cache, :closed_orders_cache, :orders_cache, :order_byid_cache, :positions_cache, :fetchall_cache, :live_recent_orders, :live_recent_trades_update) 6 end 7 8 _last_trade_date(ai) = st.lasttrade_date(ai, now() - Day(1)) 9 function somevalue(dict, keys...) 10 for k in keys 11 v = get(dict, k, nothing) 12 isnothing(v) || return v 13 end 14 end 15 16 ttl_dict_type(ttl::Period, kt=DateTime, vt=Vector{Any}) = TTL{kt,Union{Missing,vt},ConcurrentDict,typeof(ttl)} 17 ttl_resp_dict(ttl::Period, kt=DateTime, vt=Vector{Any}) = safettl(kt, Union{Missing,vt}, ttl) 18 19 function _trades_resp_cache(a, ai) 20 # every asset instance holds a mapping of timestamp (since) and relative vector of trades resps 21 cache = @lget! a :trades_cache Dict{AssetInstance,ttl_dict_type(a[:trades_cache_ttl], DateTime)}() 22 @lget! cache ai ttl_resp_dict(a[:trades_cache_ttl], DateTime) 23 end 24 25 @doc """Use `DateTime(0)` as key to fetch the *latest* response.""" 26 const LATEST_RESP_KEY = DateTime(0) 27 28 function _order_trades_resp_cache(a, ai) 29 cache = @lget! a :order_trades_cache Dict{AssetInstance,ttl_dict_type(a[:trades_cache_ttl], String)}() 30 @lget! cache ai ttl_resp_dict(a[:trades_cache_ttl], String) 31 end 32 33 function _open_orders_resp_cache(a, ai) 34 cache = @lget! a :open_orders_cache Dict{AssetInstance,ttl_dict_type(a[:open_orders_ttl], DateTime)}() 35 @lget! cache ai ttl_resp_dict(a[:open_orders_ttl], DateTime) 36 end 37 38 function _closed_orders_resp_cache(a, ai) 39 cache = @lget! a :closed_orders_cache Dict{AssetInstance,ttl_dict_type(a[:closed_orders_ttl], Union{String,DateTime})}() 40 @lget! cache ai ttl_resp_dict(a[:closed_orders_ttl], Union{String,DateTime}) 41 end 42 43 function _orders_resp_cache(a, ai) 44 cache = @lget! a :orders_cache Dict{AssetInstance,ttl_dict_type(a[:orders_cache_ttl], Any)}() 45 @lget! cache ai ttl_resp_dict(a[:orders_cache_ttl], Any) 46 end 47 48 function _order_byid_resp_cache(a, ai) 49 cache = @lget! a :order_byid_cache Dict{AssetInstance,ttl_dict_type(a[:order_byid_ttl], String)}() 50 @lget! cache ai ttl_resp_dict(a[:order_byid_ttl], String) 51 end 52 53 function _positions_resp_cache(a) 54 @lget! a :positions_cache begin 55 lock = ReentrantLock() 56 (; lock, data=ttl_resp_dict(a[:positions_ttl], Any, Any)) 57 end 58 end 59 60 function _func_cache(a) 61 @lget! a :fetchall_cache (ReentrantLock(), 62 Dict{Symbol, 63 Tuple{ReentrantLock, 64 ttl_dict_type(a[:orders_cache_ttl], DateTime, Any)}}() 65 ) 66 end 67 68 function _func_cache(a, func) 69 l, cache = _func_cache(a) 70 @lock l @lget! cache func (ReentrantLock(), ttl_resp_dict(a[:func_cache_ttl], DateTime, Any)) 71 end 72 73 function save_strategy_cache(s; inmemory=false, cache_path=nothing) 74 cache = Dict() 75 for k in cache_keys() 76 if k in keys(s) 77 cache[k] = s[k] 78 end 79 end 80 if inmemory 81 setglobal!(Main, :strategy_cache, cache) 82 else 83 Data.Cache.save_cache("strategy_cache", cache; cache_path) 84 end 85 return s 86 end 87 88 function load_strategy_cache(s, cache_path=nothing, raise=false) 89 cache = if isdefined(Main, :strategy_cache) 90 else 91 Data.Cache.load_cache("strategy_cache"; cache_path, raise) 92 end 93 if cache isa Dict 94 merge!(s.attrs, cache) 95 end 96 return s 97 end 98 99 @doc "An lru cache of recently processed orders ids." 100 const RecentOrdersDict = LRUCache.LRU{Union{UInt64,String},Nothing} 101 @doc """ Retrieves recent orders ids for a live strategy. 102 103 $(TYPEDSIGNATURES) 104 105 Returns a dictionary of recent orders ids for a given asset instance. 106 107 """ 108 function recent_orders(s::LiveStrategy, ai) 109 @lock s begin 110 lro = @lget! attrs(s) :live_recent_orders Dict{AssetInstance,RecentOrdersDict}() 111 @lget! lro ai RecentOrdersDict(maxsize=100) 112 end 113 end 114 115 @doc "An lru cache of recently processed trades hashes." 116 const RecentUpdatesDict = LRUCache.LRU{UInt64,Nothing} 117 118 function recent_trade_update(s::LiveStrategy, ai) 119 @lock s begin 120 lrt = @lget! attrs(s) :live_recent_trades_update Dict{AssetInstance,RecentUpdatesDict}() 121 @lget! lrt ai RecentUpdatesDict(maxsize=100) 122 end 123 end 124 125 function empty_caches!(s::LiveStrategy) 126 a = s.attrs 127 for k in cache_keys() 128 if haskey(a, k) 129 if k in (:positions_cache, :fetchall_cache) 130 empty!(a[k][2]) 131 else 132 empty!(a[k]) 133 end 134 end 135 end 136 end