/ Collections / src / module.jl
module.jl
  1  using Instances
  2  using Instances.Exchanges.ExchangeTypes
  3  using Instances.Exchanges: getexchange!
  4  using Instances: OrderTypes, Data, Instruments
  5  using Instances: NoMarginInstance, MarginInstance
  6  
  7  using .Data.DataFrames
  8  using .Data.DataFramesMeta
  9  using .Data: load, zi, empty_ohlcv
 10  using .Data.DFUtils
 11  using .Data.DataStructures: SortedDict
 12  
 13  using .Instruments: fiatnames, AbstractAsset, Asset, AbstractCash, compactnum as cnum
 14  using .Instruments.Derivatives
 15  using .Instruments: Misc
 16  using .Misc: TimeTicks, Lang
 17  using .TimeTicks
 18  using .Misc: Iterable, swapkeys, MarginMode
 19  using .Lang: @lget!, MatchString, Option
 20  using Base.Enums: namemap
 21  using .Misc: OrderedDict, OrderedCollections
 22  using .Misc.DocStringExtensions
 23  import .Misc: reset!
 24  
 25  @doc """A type representing a collection of asset instances.
 26  
 27  $(FIELDS)
 28  
 29  This type is used to store and manage a collection of asset instances. Each instance is linked to an asset and an exchange identifier.
 30  Elements from `AssetCollection` can be accessed using `getindex` and `setindex!` which accepts different types including `ExchangeID`, `AbstractAsset`, `AbstractString`, `MatchString`, or a combination of base, quote currency, and exchange.
 31  Iterating over the collection only iterates over the instances within.
 32  
 33  """
 34  struct AssetCollection
 35      data::DataFrame
 36      function AssetCollection(
 37          df=DataFrame(;
 38              exchange=ExchangeID[], asset=AbstractAsset[], instance=AssetInstance[]
 39          ),
 40      )
 41          new(df)
 42      end
 43      function AssetCollection(instances::Iterable{<:AssetInstance})
 44          AssetCollection(
 45              DataFrame(
 46                  (; exchange=inst.exchange.id, asset=inst.asset, instance=inst) for
 47                  inst in instances;
 48                  copycols=false,
 49              ),
 50          )
 51      end
 52      function AssetCollection(
 53          assets::Union{Iterable{String},Iterable{<:AbstractAsset}};
 54          timeframe="1m",
 55          exc::Exchange,
 56          margin::MarginMode,
 57          min_amount=1e-8,
 58          load_data=true,
 59      )
 60          if eltype(assets) == String
 61              assets = [parse(AbstractAsset, name) for name in assets]
 62          end
 63  
 64          tf = convert(TimeFrame, timeframe)
 65          load_func = if load_data
 66              (aa) -> load(zi, exc.name, aa.raw, timeframe)
 67          else
 68              (_) -> empty_ohlcv()
 69          end
 70          function get_instance(aa::AbstractAsset)
 71              data = SortedDict(tf => load_func(aa))
 72              AssetInstance(aa; data, exc, margin, min_amount)
 73          end
 74          instances_ord = Dict(raw(k) => n for (n, k) in enumerate(assets))
 75          instances = AssetInstance[]
 76          @sync for ast in assets
 77              @async push!(instances, get_instance(ast))
 78          end
 79          sort!(instances; by=(ai) -> instances_ord[raw(ai)])
 80          AssetCollection(instances)
 81      end
 82  end
 83  
 84  @enum AssetCollectionColumn exchange = 1 asset = 2 instance = 3
 85  const AssetCollectionTypes = OrderedDict([
 86      exchange => ExchangeID, asset => AbstractAsset, instance => AssetInstance
 87  ])
 88  const AssetCollectionColumns4 = Symbol.(keys(sort!(AssetCollectionTypes)))
 89  AssetCollectionColumns = AssetCollectionColumns4
 90  # HACK: const/types definitions inside macros can't be revised
 91  if !isdefined(@__MODULE__, :AssetCollectionRow)
 92      const AssetCollectionRow = @NamedTuple{
 93          exchange::ExchangeID, asset::AbstractAsset, instance::AssetInstance
 94      }
 95  end
 96  
 97  using .Instruments: isbase, isquote
 98  function Base.getindex(ac::AssetCollection, i::ExchangeID, col=Colon())
 99      @view ac.data[ac.data.exchange .== i, col]
100  end
101  function Base.getindex(ac::AssetCollection, i::AbstractAsset, col=Colon())
102      @view ac.data[ac.data.asset .== i, col]
103  end
104  function Base.getindex(ac::AssetCollection, i::AbstractString, col=Colon())
105      @view ac.data[ac.data.asset .== i, col]
106  end
107  function Base.getindex(ac::AssetCollection, i::MatchString, col=Colon())
108      v = @view ac.data[startswith.(getproperty.(ac.data.asset, :raw), uppercase(i.s)), :]
109      isempty(v) && return v
110      if col == Colon()
111          v[begin, :instance]
112      else
113          @view v[begin, col]
114      end
115  end
116  Base.getindex(ac::AssetCollection, i, i2, i3) = ac[i, i2][i3]
117  Base.get(ac::AssetCollection, i, val) = get(ac.data.instance, i, val)
118  
119  # TODO: this should use a macro...
120  @doc "Dispatch based on either base, quote currency, or exchange."
121  function bqe(df::DataFrame, b::T, q::T, e::T) where {T<:Symbol}
122      isbase.(df.asset, b) .&& isquote.(df.asset, q) .&& df.exchange .== e
123  end
124  function bqe(df::DataFrame, ::Nothing, q::T, e::T) where {T<:Symbol}
125      isquote(df.asset, q) && df.exchange .== e
126  end
127  function bqe(df::DataFrame, b::T, ::Nothing, e::T) where {T<:Symbol}
128      isbase.(df.asset, b) && df.exchange .== e
129  end
130  function bqe(df::DataFrame, b::T, q::T, e::Nothing) where {T<:Symbol}
131      isbase.(df.asset, b) .&& isquote.(df.asset, q)
132  end
133  bqe(df::DataFrame, ::Nothing, ::Nothing, e::T) where {T<:Symbol} = begin
134      df.exchange .== e
135  end
136  function bqe(df::DataFrame, ::Nothing, q::T, e::Nothing) where {T<:Symbol}
137      isquote.(df.asset, q)
138  end
139  bqe(df::DataFrame, b::T, ::Nothing, e::Nothing) where {T<:Symbol} = begin
140      isbase.(df.asset, b)
141  end
142  
143  function Base.getindex(
144      ac::AssetCollection;
145      b::Union{Symbol,Nothing}=nothing,
146      q::Union{Symbol,Nothing}=nothing,
147      e::Union{Symbol,Nothing}=nothing,
148  )
149      idx = bqe(ac.data, b, q, e)
150      @view ac.data[idx, :]
151  end
152  
153  _cashstr(ai::NoMarginInstance) = (; cash=cash(ai).value)
154  function _cashstr(ai::MarginInstance)
155      (; cash_long=cash(ai, Long()).value, cash_short=cash(ai, Short()).value)
156  end
157  
158  @doc """Pretty prints the AssetCollection DataFrame.
159  
160  $(TYPEDSIGNATURES)
161  
162  The `prettydf` function takes the following parameters:
163  
164  - `ac`: an AssetCollection object which encapsulates a collection of assets.
165  - `full` (optional, default is false): a boolean that indicates whether to print the full DataFrame. If true, the function prints the full DataFrame. If false, it prints a truncated version.
166  """
167  function prettydf(ac::AssetCollection; full=false)
168      limit = full ? size(ac.data)[1] : displaysize(stdout)[1] - 1
169      limit = min(size(ac.data)[1], limit)
170      get_row(n) = begin
171          row = @view ac.data[n, :]
172          (; _cashstr(row.instance)..., name=row.asset, exchange=row.exchange.id)
173      end
174      half = limit รท 2
175      df = DataFrame(get_row(n) for n in 1:half)
176      for n in (nrow(ac.data) - half + 1):nrow(ac.data)
177          push!(df, get_row(n))
178      end
179      df
180  end
181  
182  Base.show(io::IO, ac::AssetCollection) = write(io, string(prettydf(ac)))
183  
184  @doc """Returns a dictionary of all the OHLCV dataframes present in the asset collection.
185  
186  $(TYPEDSIGNATURES)
187  
188  The `flatten` function takes the following parameter:
189  
190  - `ac`: an AssetCollection object which encapsulates a collection of assets.
191  
192  The function returns a SortedDict where the keys are TimeFrame objects and the values are vectors of DataFrames that represent OHLCV (Open, High, Low, Close, Volume) data. The dictionary is sorted by the TimeFrame keys.
193  
194  """
195  function flatten(ac::AssetCollection; noempty=false)::SortedDict{TimeFrame,Vector{DataFrame}}
196      out = SortedDict{TimeFrame,Vector{DataFrame}}()
197      if noempty
198          return _flatten_noempty!(out, ac)
199      end
200      return _flatten!(out, ac)
201  end
202  
203  function _flatten!(out, ac::AssetCollection)
204      @eachrow ac.data for (tf, df) in :instance.data
205          metadata!(df, "asset_instance", :instance; style=Symbol("note"))
206          push!(@lget!(out, tf, DataFrame[]), df)
207      end
208      out
209  end
210  
211  function _flatten_noempty!(out, ac::AssetCollection)
212      @eachrow ac.data for (tf, df) in :instance.data
213          if !isempty(df)
214              metadata!(df, "asset_instance", :instance; style=Symbol("note"))
215              push!(@lget!(out, tf, DataFrame[]), df)
216          end
217      end
218      out
219  end
220  
221  Base.first(ac::AssetCollection, a::AbstractAsset)::DataFrame =
222      first(first(ac[a].instance).data)[2]
223  
224  @doc """Makes a date range that spans the common minimum and maximum dates of the collection.
225  
226  $(TYPEDSIGNATURES)
227  
228  The `DateRange` function takes the following parameters:
229  
230  - `ac`: an AssetCollection object which encapsulates a collection of assets.
231  - `tf` (optional): a TimeFrame object that represents a specific time frame. If not provided, the function will calculate the date range based on all time frames in the AssetCollection.
232  - `skip_empty` (optional, default is false): a boolean that indicates whether to skip empty data frames in the calculation of the date range.
233  
234  """
235  function TimeTicks.DateRange(ac::AssetCollection, tf=nothing; full=false, kwargs...)
236      if full
237          _daterange_full(ac, tf; kwargs...)
238      else
239          _daterange(ac, tf; kwargs...)
240      end
241  end
242  
243  function _daterange(ac::AssetCollection, tf=nothing; skip_empty=false)
244      m = typemin(DateTime)
245      M = typemax(DateTime)
246      for ai in ac.data.instance
247          df = first(values(ai.data))
248          if skip_empty && isempty(df)
249              continue
250          end
251          if !isempty(df)
252              d_min = firstdate(df)
253              d_min > m && (m = d_min)
254              d_max = lastdate(last(ai.data).second)
255              d_max < M && (M = d_max)
256          end
257      end
258      tf = @something tf first(ac.data[begin, :instance].data).first
259      DateRange(m, M, tf)
260  end
261  
262  @doc """Makes a date range that spans the union (earliest start to latest end) of the collection.
263  
264  $(TYPEDSIGNATURES)
265  
266  The `daterange` function returns a `DateRange` covering the earliest available timestamp
267  and the latest available timestamp across all assets' OHLCV data in the `AssetCollection`.
268  
269  Parameters:
270  
271  - `ac`: the `AssetCollection`
272  - `tf` (optional): a `TimeFrame`. If not provided, it is inferred from the first asset instance
273  """
274  function _daterange_full(ac::AssetCollection, tf=nothing; kwargs...)
275      m = typemax(DateTime)
276      M = typemin(DateTime)
277      for ai in ac.data.instance
278          # Consider the first and last dataframes in the SortedDict for breadth
279          df_first = first(values(ai.data))
280          if !isempty(df_first)
281              d_min = firstdate(df_first)
282              d_min < m && (m = d_min)
283          end
284          df_last = last(ai.data).second
285          if !isempty(df_last)
286              d_max = lastdate(df_last)
287              d_max > M && (M = d_max)
288          end
289      end
290      tf = @something tf first(ac.data[begin, :instance].data).first
291      DateRange(m, M + tf, tf)
292  end
293  
294  Base.iterate(ac::AssetCollection) = iterate(ac.data.instance)
295  Base.iterate(ac::AssetCollection, s) = iterate(ac.data.instance, s)
296  Base.first(ac::AssetCollection) = first(ac.data.instance)
297  Base.last(ac::AssetCollection) = last(ac.data.instance)
298  Base.length(ac::AssetCollection) = nrow(ac.data)
299  Base.size(ac::AssetCollection) = size(ac.data)
300  Base.similar(ac::AssetCollection) = begin
301      AssetCollection(similar.(ac.data.instance))
302  end
303  
304  @doc """Checks that all assets in the universe match the cash currency.
305  
306  $(TYPEDSIGNATURES)
307  
308  The `iscashable` function takes the following parameters:
309  
310  - `c`: an AbstractCash object which encapsulates a representation of cash.
311  - `ac`: an AssetCollection object which encapsulates a collection of assets.
312  """
313  iscashable(c::AbstractCash, ac::AssetCollection) = begin
314      for ai in ac
315          if ai.asset.qc != nameof(c)
316              return false
317          end
318      end
319      return true
320  end
321  
322  reset!(ac::AssetCollection) = begin
323      ais = ac.data.instance
324      foreach(eachindex(ais)) do idx
325          ai = ais[idx]
326          this_exc = exchange(ai)
327          eid = exchangeid(this_exc)
328          acc = account(this_exc)
329          params = this_exc.params
330          sandbox = issandbox(this_exc)
331          ais[idx] = similar(ai; exc=getexchange!(eid, params; sandbox, account=acc))
332      end
333  end
334  
335  export AssetCollection, flatten, iscashable