/ LiveMode / src / caching.jl
caching.jl
  1  using LRUCache: LRUCache
  2  using .Misc.TimeToLive: ConcurrentDict
  3  
  4  function cache_keys()
  5      (:trades_cache, :open_orders_cache, :closed_orders_cache, :orders_cache, :order_byid_cache, :positions_cache, :fetchall_cache, :live_recent_orders, :live_recent_trades_update)
  6  end
  7  
  8  _last_trade_date(ai) = st.lasttrade_date(ai, now() - Day(1))
  9  function somevalue(dict, keys...)
 10      for k in keys
 11          v = get(dict, k, nothing)
 12          isnothing(v) || return v
 13      end
 14  end
 15  
 16  ttl_dict_type(ttl::Period, kt=DateTime, vt=Vector{Any}) = TTL{kt,Union{Missing,vt},ConcurrentDict,typeof(ttl)}
 17  ttl_resp_dict(ttl::Period, kt=DateTime, vt=Vector{Any}) = safettl(kt, Union{Missing,vt}, ttl)
 18  
 19  function _trades_resp_cache(a, ai)
 20      # every asset instance holds a mapping of timestamp (since) and relative vector of trades resps
 21      cache = @lget! a :trades_cache Dict{AssetInstance,ttl_dict_type(a[:trades_cache_ttl], DateTime)}()
 22      @lget! cache ai ttl_resp_dict(a[:trades_cache_ttl], DateTime)
 23  end
 24  
 25  @doc """Use `DateTime(0)` as key to fetch the *latest* response."""
 26  const LATEST_RESP_KEY = DateTime(0)
 27  
 28  function _order_trades_resp_cache(a, ai)
 29      cache = @lget! a :order_trades_cache Dict{AssetInstance,ttl_dict_type(a[:trades_cache_ttl], String)}()
 30      @lget! cache ai ttl_resp_dict(a[:trades_cache_ttl], String)
 31  end
 32  
 33  function _open_orders_resp_cache(a, ai)
 34      cache = @lget! a :open_orders_cache Dict{AssetInstance,ttl_dict_type(a[:open_orders_ttl], DateTime)}()
 35      @lget! cache ai ttl_resp_dict(a[:open_orders_ttl], DateTime)
 36  end
 37  
 38  function _closed_orders_resp_cache(a, ai)
 39      cache = @lget! a :closed_orders_cache Dict{AssetInstance,ttl_dict_type(a[:closed_orders_ttl], Union{String,DateTime})}()
 40      @lget! cache ai ttl_resp_dict(a[:closed_orders_ttl], Union{String,DateTime})
 41  end
 42  
 43  function _orders_resp_cache(a, ai)
 44      cache = @lget! a :orders_cache Dict{AssetInstance,ttl_dict_type(a[:orders_cache_ttl], Any)}()
 45      @lget! cache ai ttl_resp_dict(a[:orders_cache_ttl], Any)
 46  end
 47  
 48  function _order_byid_resp_cache(a, ai)
 49      cache = @lget! a :order_byid_cache Dict{AssetInstance,ttl_dict_type(a[:order_byid_ttl], String)}()
 50      @lget! cache ai ttl_resp_dict(a[:order_byid_ttl], String)
 51  end
 52  
 53  function _positions_resp_cache(a)
 54      @lget! a :positions_cache begin
 55          lock = ReentrantLock()
 56          (; lock, data=ttl_resp_dict(a[:positions_ttl], Any, Any))
 57      end
 58  end
 59  
 60  function _func_cache(a)
 61      @lget! a :fetchall_cache (ReentrantLock(),
 62          Dict{Symbol,
 63              Tuple{ReentrantLock,
 64                  ttl_dict_type(a[:orders_cache_ttl], DateTime, Any)}}()
 65      )
 66  end
 67  
 68  function _func_cache(a, func)
 69      l, cache = _func_cache(a)
 70      @lock l @lget! cache func (ReentrantLock(), ttl_resp_dict(a[:func_cache_ttl], DateTime, Any))
 71  end
 72  
 73  function save_strategy_cache(s; inmemory=false, cache_path=nothing)
 74      cache = Dict()
 75      for k in cache_keys()
 76          if k in keys(s)
 77              cache[k] = s[k]
 78          end
 79      end
 80      if inmemory
 81          setglobal!(Main, :strategy_cache, cache)
 82      else
 83          Data.Cache.save_cache("strategy_cache", cache; cache_path)
 84      end
 85      return s
 86  end
 87  
 88  function load_strategy_cache(s, cache_path=nothing, raise=false)
 89      cache = if isdefined(Main, :strategy_cache)
 90      else
 91          Data.Cache.load_cache("strategy_cache"; cache_path, raise)
 92      end
 93      if cache isa Dict
 94          merge!(s.attrs, cache)
 95      end
 96      return s
 97  end
 98  
 99  @doc "An lru cache of recently processed orders ids."
100  const RecentOrdersDict = LRUCache.LRU{Union{UInt64,String},Nothing}
101  @doc """ Retrieves recent orders ids for a live strategy.
102  
103  $(TYPEDSIGNATURES)
104  
105  Returns a dictionary of recent orders ids for a given asset instance.
106  
107  """
108  function recent_orders(s::LiveStrategy, ai)
109      @lock s begin
110          lro = @lget! attrs(s) :live_recent_orders Dict{AssetInstance,RecentOrdersDict}()
111          @lget! lro ai RecentOrdersDict(maxsize=100)
112      end
113  end
114  
115  @doc "An lru cache of recently processed trades hashes."
116  const RecentUpdatesDict = LRUCache.LRU{UInt64,Nothing}
117  
118  function recent_trade_update(s::LiveStrategy, ai)
119      @lock s begin
120          lrt = @lget! attrs(s) :live_recent_trades_update Dict{AssetInstance,RecentUpdatesDict}()
121          @lget! lrt ai RecentUpdatesDict(maxsize=100)
122      end
123  end
124  
125  function empty_caches!(s::LiveStrategy)
126      a = s.attrs
127      for k in cache_keys()
128          if haskey(a, k)
129              if k in (:positions_cache, :fetchall_cache)
130                  empty!(a[k][2])
131              else
132                  empty!(a[k])
133              end
134          end
135      end
136  end