/ FeatureSelection / src / crosscorr.jl
crosscorr.jl
  1  using StatsBase: crosscor
  2  using Strategies: Strategies as st
  3  using .st.Misc: Option
  4  using .st.Data: Data as da
  5  using .da.DataFrames: DataFrame, innerjoin, outerjoin, metadata!, metadata, select!, names
  6  using .st: TimeFrame, DFT, @tf_str
  7  using .st.coll: _flatten_noempty!, raw
  8  using .st.Exchanges: tickers
  9  using Processing.Alignments: trim!, empty_unaligned!
 10  using OnlineTechnicalIndicators: OnlineTechnicalIndicators as oti
 11  
 12  # Import specific indicators from OnlineTechnicalIndicators
 13  using .oti: SMA, StdDev, fit!
 14  
 15  @doc """
 16      center_data(data::Dict)
 17  
 18  
 19  """
 20  function center_data(data::AbstractDict, tf=nothing; ratio_func=ratio!)
 21      @assert keytype(data) <: TimeFrame keytype(data)
 22      @assert valtype(data) <: Vector{DataFrame} valtype(data)
 23  
 24      input_tf = isnothing(tf) ? first(keys(data)) : tf
 25      this_data = Dict(
 26          input_tf => [
 27              let this_df = copy(df)
 28                  metadata!(
 29                      this_df,
 30                      "asset_instance",
 31                      metadata(df, "asset_instance");
 32                      style=:note,
 33                  )
 34                  this_df
 35              end for df in data[input_tf]
 36          ],
 37      )
 38      trim!(this_data; tail=true)
 39      empty_unaligned!(this_data)
 40      vecs = [
 41          ratio_func(similar(df.close, size(df, 1) - 1), df.close; dims=1) for
 42          df in this_data[input_tf] if !isempty(df)
 43      ]
 44      return this_data, reduce(hcat, vecs)
 45  end
 46  
 47  function lagsbytf(tf::TimeFrame)
 48      if tf == tf"1m"
 49          [1, 5, 15, 60, 60 * 4, 60 * 8, 60 * 12]
 50      elseif tf == tf"1h"
 51          [1, 4, 8, 12, 24]
 52      elseif tf == tf"8h"
 53          [1, 2, 3, 6, 12]
 54      elseif tf == tf"1d"
 55          [1, 2, 3, 5, 7]
 56      end
 57  end
 58  
 59  @doc """
 60      crosscorr_assets(s::st.Strategy, tf=s.timeframe; ratio_func=ratio!, min_vol=1e6, x_num=5, demean=false, lags=nothing)
 61  
 62  
 63      `lags` is a Vector of Integers
 64  """
 65  function crosscorr_assets(
 66      s::st.Strategy,
 67      tf=s.timeframe;
 68      ratio_func=ratio!,
 69      min_vol=1e6,
 70      x_num=5,
 71      demean=false,
 72      lags=lagsbytf(tf),
 73      tail::Option{Int}=nothing
 74  )
 75      data = st.coll.flatten(st.universe(s); noempty=true)
 76      (trimmed_data, v) = center_data(data, tf; ratio_func)
 77      names = [raw(metadata(df, "asset_instance")) for df in trimmed_data[tf] if !isempty(df)]
 78      # NOTE: as_vec=true is required to sort by volume (lower volumes first)
 79      centered = DataFrame(v, names)
 80  
 81      # Apply tail lookback if specified
 82      if !isnothing(tail) && tail > 0
 83          if size(centered, 1) > tail
 84              centered = @view centered[(end - tail + 1):end, :]
 85          else
 86              @warn "Tail lookback ($tail) is greater than or equal to the number of data points ($(size(centered, 1))). Using all data."
 87          end
 88      end
 89  
 90      # --- Added logging ---
 91      if size(centered, 1) > 0
 92          # Attempt to get corresponding timestamps.
 93          # Assuming centered rows align with timestamps from trimmed_data[tf][1]
 94          # after ratio calculation (which reduces rows by 1).
 95          # Need to adjust indices for the tail.
 96          original_timestamps = trimmed_data[tf][1].timestamp # Assuming all assets have the same timestamps after trim!/empty_unaligned!
 97          # The ratio reduces row count by 1, so the i-th row of 'centered' corresponds to the (i+1)-th original timestamp
 98          # The tail selects rows from (end - tail + 1) to end of 'centered'.
 99          # If centered has N rows after ratio, tail selects rows N-tail+1 to N.
100          # These correspond to original timestamps at indices (N-tail+1)+1 to N+1.
101          N_centered = size(centered, 1) # Number of rows in centered after potential trimming
102          N_original = size(original_timestamps, 1) # Number of timestamps in original data (per asset) after alignment
103  
104          if N_centered > 0 && N_original > 0
105               # Determine the index in original_timestamps corresponding to the first row of the potentially trimmed centered
106               first_ts_idx_in_original = N_original - N_centered + 2 # +1 for ratio, +1 for 1-based indexing
107  
108               # Determine the index in original_timestamps corresponding to the last row of the potentially trimmed centered
109               last_ts_idx_in_original = N_original # The last row of centered comes from the last data point in the original data
110  
111              if first_ts_idx_in_original > 0 && last_ts_idx_in_original <= N_original && first_ts_idx_in_original <= last_ts_idx_in_original
112                   @debug "crosscorr_assets: First timestamp used: $(original_timestamps[first_ts_idx_in_original]), Last timestamp used: $(original_timestamps[last_ts_idx_in_original])"
113               else
114                    @debug "crosscorr_assets: Could not determine valid timestamp range for logging."
115               end
116          else
117              @debug "crosscorr_assets: Centered data is empty, no timestamps to report."
118          end
119      else
120          @debug "crosscorr_assets: Centered data is empty, no timestamps to report."
121      end
122      # --- End added logging ---
123  
124  
125      assets = let vec = tickers(st.getexchange!(s.exchange), s.qc; min_vol=min_vol, as_vec=true)
126          [el for el in vec if el in names]
127      end
128      x_assets = assets[(end - x_num + 1):end]
129      y_assets = assets[begin:(end - x_num + 1)]
130      x_df = @view centered[:, x_assets]
131      y_df = @view centered[:, y_assets]
132      if !isnothing(lags)
133          args = (Matrix(x_df), Matrix(y_df), lags)
134          kwargs = (; demean)
135      else
136          args = (Matrix(x_df), Matrix(y_df))
137          kwargs = (; demean)
138      end
139      corr = crosscor(args...; kwargs...)
140      corr_dict = Dict()
141      for i in eachindex(lags)
142          m = @view corr[i, :, :]
143          df = DataFrame(m, y_assets)
144          # Add x_assets as the first column to match streaming version output
145          df.x_asset = x_assets
146          select!(df, vcat("x_asset", y_assets))
147          metadata!(df, "lag", lags[i]; style=:note)
148          metadata!(df, "x_assets", x_assets; style=:note)
149          corr_dict[lags[i]] = df
150      end
151      return corr_dict
152  end
153  
154  export crosscorr_assets