crosscorr.jl
1 using StatsBase: crosscor 2 using Strategies: Strategies as st 3 using .st.Misc: Option 4 using .st.Data: Data as da 5 using .da.DataFrames: DataFrame, innerjoin, outerjoin, metadata!, metadata, select!, names 6 using .st: TimeFrame, DFT, @tf_str 7 using .st.coll: _flatten_noempty!, raw 8 using .st.Exchanges: tickers 9 using Processing.Alignments: trim!, empty_unaligned! 10 using OnlineTechnicalIndicators: OnlineTechnicalIndicators as oti 11 12 # Import specific indicators from OnlineTechnicalIndicators 13 using .oti: SMA, StdDev, fit! 14 15 @doc """ 16 center_data(data::Dict) 17 18 19 """ 20 function center_data(data::AbstractDict, tf=nothing; ratio_func=ratio!) 21 @assert keytype(data) <: TimeFrame keytype(data) 22 @assert valtype(data) <: Vector{DataFrame} valtype(data) 23 24 input_tf = isnothing(tf) ? first(keys(data)) : tf 25 this_data = Dict( 26 input_tf => [ 27 let this_df = copy(df) 28 metadata!( 29 this_df, 30 "asset_instance", 31 metadata(df, "asset_instance"); 32 style=:note, 33 ) 34 this_df 35 end for df in data[input_tf] 36 ], 37 ) 38 trim!(this_data; tail=true) 39 empty_unaligned!(this_data) 40 vecs = [ 41 ratio_func(similar(df.close, size(df, 1) - 1), df.close; dims=1) for 42 df in this_data[input_tf] if !isempty(df) 43 ] 44 return this_data, reduce(hcat, vecs) 45 end 46 47 function lagsbytf(tf::TimeFrame) 48 if tf == tf"1m" 49 [1, 5, 15, 60, 60 * 4, 60 * 8, 60 * 12] 50 elseif tf == tf"1h" 51 [1, 4, 8, 12, 24] 52 elseif tf == tf"8h" 53 [1, 2, 3, 6, 12] 54 elseif tf == tf"1d" 55 [1, 2, 3, 5, 7] 56 end 57 end 58 59 @doc """ 60 crosscorr_assets(s::st.Strategy, tf=s.timeframe; ratio_func=ratio!, min_vol=1e6, x_num=5, demean=false, lags=nothing) 61 62 63 `lags` is a Vector of Integers 64 """ 65 function crosscorr_assets( 66 s::st.Strategy, 67 tf=s.timeframe; 68 ratio_func=ratio!, 69 min_vol=1e6, 70 x_num=5, 71 demean=false, 72 lags=lagsbytf(tf), 73 tail::Option{Int}=nothing 74 ) 75 data = st.coll.flatten(st.universe(s); noempty=true) 76 (trimmed_data, v) = center_data(data, tf; ratio_func) 77 names = [raw(metadata(df, "asset_instance")) for df in trimmed_data[tf] if !isempty(df)] 78 # NOTE: as_vec=true is required to sort by volume (lower volumes first) 79 centered = DataFrame(v, names) 80 81 # Apply tail lookback if specified 82 if !isnothing(tail) && tail > 0 83 if size(centered, 1) > tail 84 centered = @view centered[(end - tail + 1):end, :] 85 else 86 @warn "Tail lookback ($tail) is greater than or equal to the number of data points ($(size(centered, 1))). Using all data." 87 end 88 end 89 90 # --- Added logging --- 91 if size(centered, 1) > 0 92 # Attempt to get corresponding timestamps. 93 # Assuming centered rows align with timestamps from trimmed_data[tf][1] 94 # after ratio calculation (which reduces rows by 1). 95 # Need to adjust indices for the tail. 96 original_timestamps = trimmed_data[tf][1].timestamp # Assuming all assets have the same timestamps after trim!/empty_unaligned! 97 # The ratio reduces row count by 1, so the i-th row of 'centered' corresponds to the (i+1)-th original timestamp 98 # The tail selects rows from (end - tail + 1) to end of 'centered'. 99 # If centered has N rows after ratio, tail selects rows N-tail+1 to N. 100 # These correspond to original timestamps at indices (N-tail+1)+1 to N+1. 101 N_centered = size(centered, 1) # Number of rows in centered after potential trimming 102 N_original = size(original_timestamps, 1) # Number of timestamps in original data (per asset) after alignment 103 104 if N_centered > 0 && N_original > 0 105 # Determine the index in original_timestamps corresponding to the first row of the potentially trimmed centered 106 first_ts_idx_in_original = N_original - N_centered + 2 # +1 for ratio, +1 for 1-based indexing 107 108 # Determine the index in original_timestamps corresponding to the last row of the potentially trimmed centered 109 last_ts_idx_in_original = N_original # The last row of centered comes from the last data point in the original data 110 111 if first_ts_idx_in_original > 0 && last_ts_idx_in_original <= N_original && first_ts_idx_in_original <= last_ts_idx_in_original 112 @debug "crosscorr_assets: First timestamp used: $(original_timestamps[first_ts_idx_in_original]), Last timestamp used: $(original_timestamps[last_ts_idx_in_original])" 113 else 114 @debug "crosscorr_assets: Could not determine valid timestamp range for logging." 115 end 116 else 117 @debug "crosscorr_assets: Centered data is empty, no timestamps to report." 118 end 119 else 120 @debug "crosscorr_assets: Centered data is empty, no timestamps to report." 121 end 122 # --- End added logging --- 123 124 125 assets = let vec = tickers(st.getexchange!(s.exchange), s.qc; min_vol=min_vol, as_vec=true) 126 [el for el in vec if el in names] 127 end 128 x_assets = assets[(end - x_num + 1):end] 129 y_assets = assets[begin:(end - x_num + 1)] 130 x_df = @view centered[:, x_assets] 131 y_df = @view centered[:, y_assets] 132 if !isnothing(lags) 133 args = (Matrix(x_df), Matrix(y_df), lags) 134 kwargs = (; demean) 135 else 136 args = (Matrix(x_df), Matrix(y_df)) 137 kwargs = (; demean) 138 end 139 corr = crosscor(args...; kwargs...) 140 corr_dict = Dict() 141 for i in eachindex(lags) 142 m = @view corr[i, :, :] 143 df = DataFrame(m, y_assets) 144 # Add x_assets as the first column to match streaming version output 145 df.x_asset = x_assets 146 select!(df, vcat("x_asset", y_assets)) 147 metadata!(df, "lag", lags[i]; style=:note) 148 metadata!(df, "x_assets", x_assets; style=:note) 149 corr_dict[lags[i]] = df 150 end 151 return corr_dict 152 end 153 154 export crosscorr_assets