beta.jl
1 using StatsBase: cov, var 2 using GLM 3 using Statistics: mean 4 using Strategies: Strategies as st 5 using .st.Misc: Option 6 using .st.Data: Data as da, DataFrame 7 using .da.DataFrames: DataFrame, metadata!, deletemetadata!, names, metadata 8 using .st: TimeFrame, DFT, @tf_str 9 using .st.coll: _flatten_noempty!, raw, flatten 10 using .st.Exchanges: tickers 11 using Processing.Alignments: trim!, empty_unaligned! 12 using GLM: @formula 13 using LinearAlgebra: diag 14 15 @doc """ 16 calculate_beta_covariance(stock_returns, market_returns) 17 18 Calculate the Beta of a stock/asset relative to a market benchmark using the covariance and variance method. 19 20 Beta = Covariance(stock returns, market returns) / Variance(market returns) 21 22 # Arguments 23 - `stock_returns::Vector{DFT}`: Vector of historical returns for the stock/asset. 24 - `market_returns::Vector{DFT}`: Vector of historical returns for the market benchmark. 25 26 # Returns 27 - `DFT`: The calculated Beta value. 28 29 # Throws 30 - `Error`: If the input vectors have different lengths, insufficient data, or market variance is zero. 31 """ 32 function calculate_beta_covariance(stock_returns::Vector{DFT}, market_returns::Vector{DFT})::DFT 33 if length(stock_returns) != length(market_returns) 34 error("Input return series must have the same length") 35 end 36 37 if length(stock_returns) < 2 # Need at least 2 data points for covariance/variance 38 error("Not enough data points to calculate beta") 39 end 40 41 cov_returns = cov(stock_returns, market_returns) 42 var_market = var(market_returns) 43 44 if abs(var_market) < eps(DFT) # Check if variance is close to zero 45 error("Cannot calculate beta: Market variance is zero or near zero") 46 end 47 48 beta = cov_returns / var_market 49 50 return beta 51 end 52 53 @doc """ 54 calculate_beta_regression(stock_returns, market_returns) 55 56 Calculate the Beta of a stock/asset relative to a market benchmark using linear regression. 57 58 Beta is the slope coefficient (β) from the linear regression model: 59 stock_returns = α + β * market_returns + ε 60 61 # Arguments 62 - `stock_returns::Vector{DFT}`: Vector of historical returns for the stock/asset. 63 - `market_returns::Vector{DFT}`: Vector of historical returns for the market benchmark. 64 65 # Returns 66 - `DFT`: The calculated Beta value (the slope coefficient). 67 68 # Throws 69 - `Error`: If the input vectors have different lengths or insufficient data. 70 """ 71 function calculate_beta_regression(stock_returns::Vector{DFT}, market_returns::Vector{DFT})::DFT 72 if length(stock_returns) != length(market_returns) 73 error("Input return series must have the same length") 74 end 75 76 if length(stock_returns) < 2 # Need at least 2 data points for regression 77 error("Not enough data points to calculate beta") 78 end 79 80 # Create a DataFrame for GLM 81 data = DataFrame(stock_returns=stock_returns, market_returns=market_returns) 82 83 # Perform linear regression: stock_returns ~ market_returns 84 model = lm(@formula(stock_returns ~ market_returns), data) 85 86 # Extract the coefficient for market_returns (which is Beta) 87 # The coefficients are in the order of the formula: intercept, market_returns 88 beta = GLM.coef(model)[2] 89 90 return beta 91 end 92 93 @doc """ 94 beta_indicator(s::st.Strategy, tf=s.timeframe; benchmark = :top_asset, min_vol = 1e6, method = :both) 95 96 Calculate the Beta for all assets in a strategy's universe relative to a specified benchmark. 97 98 The benchmark can be: 99 - A Symbol: `:top_asset` (single asset with highest volume) or `:top_5_percent` (aggregate of top 5% by volume). 100 - A String: The name of a specific asset from the strategy's universe. 101 - A DataFrame: An external DataFrame with a timestamp column (first) and numerical return column (second). 102 103 # Arguments 104 - `s::st.Strategy`: The strategy object containing the universe of assets. 105 - `tf::TimeFrame`: The timeframe for the asset data (defaults to strategy's timeframe). 106 - `benchmark::Union{Symbol, String, DataFrame}`: Specifies the benchmark. See description above. Defaults to `:top_asset`. 107 - `min_vol::DFT`: The minimum volume required for an asset to be considered for `:top_asset` or `:top_5_percent` benchmarks. Defaults to 1e6. 108 - `method::Symbol`: The method to use for Beta calculation. Accepts `:covariance`, `:regression`, or `:both`. Defaults to `:both`. Defaults to `:covariance`. 109 - `tail::Option{Int}`: The number of data points to use for the Beta calculation. If `nothing`, all data points are used. If a positive integer, the last `tail` data points are used. 110 111 # Returns 112 - `DataFrame`: A DataFrame with columns `:Asset`, and either `:Beta_Covariance`, `:Beta_Regression`, or both, depending on the `method` argument. 113 Returns an empty DataFrame if insufficient data or assets are available, or if the benchmark cannot be determined. 114 """ 115 function beta_indicator(s::st.Strategy, tf=s.timeframe; benchmark::Union{Symbol, String, DataFrame} = :top_asset, min_vol::DFT=1e6, method::Symbol = :covariance, tail::Option{Int} = nothing)::DataFrame 116 # Get universe data 117 universe_data = st.universe(s) 118 119 local benchmark_returns::Vector{DFT} 120 local benchmark_name = "benchmark" 121 local centered_df::DataFrame 122 local asset_names::Vector{String} 123 124 # Handle external DataFrame benchmark before flattening 125 if typeof(benchmark) <: DataFrame 126 external_benchmark_df = copy(benchmark) # necessary to ensure metadata uniqueness 127 if size(external_benchmark_df, 2) < 2 128 error("External benchmark DataFrame must have at least two columns.") 129 end 130 # Ensure the external benchmark DataFrame has the correct timeframe metadata 131 da.timeframe!(external_benchmark_df, tf) 132 133 # Add "asset_instance" metadata if not present. Use a unique temporary key. 134 metadata!(external_benchmark_df, "asset_instance", benchmark_name) # Store unique key 135 136 # Flatten the universe data 137 flattened_data = st.coll.flatten(universe_data; noempty=true) 138 139 # Get initial list of DataFrames before adding benchmark 140 local initial_dfs = get(flattened_data, tf, DataFrame[]) 141 142 push!(@lget!(flattened_data, tf, DataFrame[]), external_benchmark_df) 143 144 # Now, apply center_data to the combined flattened data 145 try 146 # center_data will handle the alignment internally 147 (trimmed_data, v) = center_data(flattened_data, tf; ratio_func=ratio!) 148 149 local benchmark_df_trimmed::Option{DataFrame} = nothing 150 local benchmark_v_column_idx::Option{Int} = nothing # Store the column index in v for the benchmark 151 local asset_names_and_v_indices = Tuple{String, Int}[] # To store (asset_name, v_column_index) pairs for assets 152 153 local current_v_col_idx = 1 # Counter for the current column index in the v matrix 154 155 for df_in_trimmed in trimmed_data[tf] 156 if !isempty(df_in_trimmed) 157 asset_meta_name = raw(metadata(df_in_trimmed, "asset_instance")) 158 if asset_meta_name == benchmark_name 159 benchmark_df_trimmed = df_in_trimmed 160 benchmark_v_column_idx = current_v_col_idx # Record the column index in v 161 else 162 # This is an asset DataFrame, store its name and its column index in v 163 push!(asset_names_and_v_indices, (asset_meta_name, current_v_col_idx)) 164 end 165 current_v_col_idx += 1 # Increment for the next non-empty DataFrame 166 end 167 end 168 169 if isnothing(benchmark_df_trimmed) || isnothing(benchmark_v_column_idx) 170 @warn "External benchmark DataFrame (with metadata key '$benchmark_name') not found in trimmed data after centering." asset_metadata=[(raw(metadata(d, "asset_instance", "<missing>")), names(d)) for d in trimmed_data[tf] if !isempty(d)] 171 return DataFrame() 172 end 173 174 # benchmark_v_column_idx now correctly points to the benchmark's column in v 175 benchmark_returns = v[:, benchmark_v_column_idx] 176 @debug "Using aligned external DataFrame (key: '$benchmark_name') as benchmark." 177 178 # Separate the collected asset names and indices 179 asset_names = [pair[1] for pair in asset_names_and_v_indices] 180 cols_to_keep_v_indices = [pair[2] for pair in asset_names_and_v_indices] 181 182 if isempty(asset_names) || size(v, 1) < 2 || isempty(cols_to_keep_v_indices) 183 @warn "Not enough aligned asset data after centering and excluding external benchmark, or no assets left." 184 return DataFrame() 185 end 186 187 # Ensure cols_to_keep_v_indices are valid for v's dimensions 188 if any(idx -> idx < 1 || idx > size(v, 2), cols_to_keep_v_indices) 189 @error "Calculated column indices for assets are out of bounds for the centered matrix." indices=cols_to_keep_v_indices matrix_size=size(v) asset_info=asset_names_and_v_indices benchmark_name=benchmark_name 190 return DataFrame() 191 end 192 193 centered_v_assets = v[:, cols_to_keep_v_indices] 194 centered_df = DataFrame(centered_v_assets, asset_names) # Create DataFrame from centered asset data 195 196 catch e 197 @warn "Centering combined data with external benchmark failed." exception = e 198 return DataFrame() # Return empty DataFrame on centering failure 199 end 200 201 else 202 # No external DataFrame, proceed with existing logic using flattened universe data 203 # Get flattened data 204 flattened_data = st.coll.flatten(universe_data; noempty=true) 205 206 # Apply center_data to the flattened universe data 207 try 208 (trimmed_data, v) = center_data(flattened_data, tf; ratio_func=ratio!) # Using ratio! directly, assuming it's in scope 209 asset_names = [raw(metadata(df, "asset_instance")) for df in trimmed_data[tf] if !isempty(df)] 210 centered_df = DataFrame(v, asset_names) 211 212 if isempty(asset_names) || size(centered_df, 1) < 2 213 @warn "Not enough data or assets available to calculate beta after centering." 214 return DataFrame() 215 end 216 217 catch e 218 @warn "Centering universe data failed." exception = e 219 return DataFrame() 220 end 221 222 # Determine benchmark returns based on symbol or string from the centered asset data 223 224 # Get volume-sorted asset names that are also in the centered data 225 all_volume_sorted_assets = tickers(st.getexchange!(s.exchange), s.qc; min_vol=min_vol, as_vec=true) 226 volume_sorted_assets = [asset for asset in all_volume_sorted_assets if asset in asset_names] 227 228 if typeof(benchmark) <: String 229 # Use specific asset as benchmark 230 if !(benchmark in asset_names) 231 @warn "Specified benchmark asset \"$(benchmark)\" is not in the strategy's universe or centered data." 232 return DataFrame() 233 end 234 benchmark_name = benchmark 235 benchmark_returns = centered_df[:, benchmark_name] 236 @debug "Using specific asset \"$(benchmark)\" as benchmark." 237 238 elseif typeof(benchmark) <: Symbol 239 if benchmark == :top_asset || benchmark == :top_5_percent 240 241 if isempty(volume_sorted_assets) 242 @warn "No assets meet the minimum volume requirement or are not in the data to determine $(benchmark) benchmark." 243 return DataFrame() 244 end 245 246 if benchmark == :top_asset 247 benchmark_name = last(volume_sorted_assets) 248 if !(benchmark_name in names(centered_df)) 249 @warn "Top asset \"$(benchmark_name)\" not in centered data columns." 250 return DataFrame() 251 end 252 benchmark_returns = centered_df[:, benchmark_name] 253 @debug "Using top asset \"$(benchmark_name)\" as benchmark." 254 elseif benchmark == :top_5_percent 255 # Select top 5% assets (at least one) 256 num_top_5_percent = max(1, floor(Int, length(volume_sorted_assets) * 0.05)) 257 benchmark_assets = volume_sorted_assets[1:min(num_top_5_percent, end)] 258 259 if isempty(benchmark_assets) 260 @warn "Could not select top 5% assets for benchmark." 261 return DataFrame() 262 end 263 264 # Calculate aggregate returns (mean of returns across selected assets) 265 # Ensure all benchmark_assets are in centered_df columns 266 valid_benchmark_assets = [asset for asset in benchmark_assets if asset in names(centered_df)] 267 if isempty(valid_benchmark_assets) 268 @warn "Selected top 5% assets not found in centered data columns." 269 return DataFrame() 270 end 271 272 benchmark_returns = mean(Matrix(@view centered_df[:, valid_benchmark_assets]); dims=2)[:, 1] 273 benchmark_name = "Top $(length(valid_benchmark_assets)) Assets Aggregate" 274 @debug "Using aggregate of top $(length(valid_benchmark_assets)) assets as benchmark: $(valid_benchmark_assets)" 275 end 276 else 277 error("Invalid benchmark symbol: $(benchmark). Must be :top_asset or :top_5_percent.") 278 end 279 else 280 # This case should not be reached due to the initial type check, 281 # but included for completeness. 282 error("Invalid benchmark type after initial check: $(typeof(benchmark)).") 283 end 284 end # end of if/else block handling benchmark types 285 286 # Check if benchmark_returns were successfully determined and match the length of centered data 287 if !(@isdefined benchmark_returns) || isempty(benchmark_returns) 288 @warn "Benchmark returns could not be determined." 289 return DataFrame() 290 end 291 292 if length(benchmark_returns) != size(centered_df, 1) 293 @warn "Aligned benchmark data length ($(length(benchmark_returns))) does not match aligned asset data length ($(size(centered_df, 1))). This indicates an issue with alignment or data processing." 294 return DataFrame() 295 end 296 297 # --- New code for handling 'tail' argument --- 298 local data_length = size(centered_df, 1) 299 local returns_to_use_df::DataFrame 300 local benchmark_returns_to_use::Vector{DFT} 301 302 if tail !== nothing 303 if tail <= 0 304 @warn "Invalid 'tail' value provided. Must be a positive integer." tail=tail 305 return DataFrame() 306 end 307 if tail > data_length 308 @warn "'tail' value ($(tail)) is greater than the available data length ($(data_length)). Using all available data." tail=tail data_length=data_length 309 returns_to_use_df = centered_df 310 benchmark_returns_to_use = benchmark_returns 311 else 312 @debug "Using last $(tail) data points for Beta calculation." 313 returns_to_use_df = centered_df[end-tail+1:end, :] 314 benchmark_returns_to_use = benchmark_returns[end-tail+1:end] 315 end 316 else 317 # If tail is not provided, use all available data 318 returns_to_use_df = centered_df 319 benchmark_returns_to_use = benchmark_returns 320 end 321 322 # Ensure sufficient data points are available AFTER applying the tail (if any) 323 if size(returns_to_use_df, 1) < 2 324 @warn "Not enough data points to calculate beta after applying 'tail' or centering." data_points=size(returns_to_use_df, 1) minimum_required=2 325 return DataFrame() 326 end 327 # --- End of new code for handling 'tail' argument --- 328 329 # Determine the columns for the results DataFrame 330 local result_cols::Vector{Symbol} 331 if method == :covariance 332 result_cols = [:Asset, :Beta_Covariance] 333 elseif method == :regression 334 result_cols = [:Asset, :Beta_Regression] 335 elseif method == :both 336 result_cols = [:Asset, :Beta_Covariance, :Beta_Regression] 337 else 338 error("Invalid method: $(method). Must be :covariance, :regression, or :both.") 339 end 340 341 # Prepare to collect results 342 results_data = [] 343 344 # Convert centered_df to matrix for efficient calculations - NOW using returns_to_use_df 345 centered_matrix = Matrix(returns_to_use_df) 346 347 # Initialize result variables - ensure type can handle missing 348 local beta_cov_results::Union{Vector{<:Union{DFT, Missing}}, Nothing} = nothing 349 local beta_reg_results::Union{Vector{<:Union{DFT, Missing}}, Nothing} = nothing 350 351 # Calculate betas based on the specified method using vectorized operations 352 if method == :covariance || method == :both 353 # Calculate covariance vector: cov(asset_returns, benchmark_returns) for all assets 354 # NOW using benchmark_returns_to_use 355 cov_vector = cov(centered_matrix, benchmark_returns_to_use) 356 357 # Calculate variance of benchmark returns - NOW using benchmark_returns_to_use 358 var_market = var(benchmark_returns_to_use) 359 360 if abs(var_market) < eps(DFT) 361 @warn "Cannot calculate Beta by Covariance: Market variance is zero or near zero." 362 # Fill with missing, ensuring the type can handle it 363 beta_cov_results = Vector{Union{DFT, Missing}}(fill(missing, length(asset_names))) 364 else 365 # Beta_Covariance = Covariance(asset returns, benchmark returns) / Variance(market returns) 366 # Ensure the result vector can hold missing in case of Inf/NaN 367 cov_vector_result = vec(cov_vector ./ var_market) 368 beta_cov_results = Vector{Union{DFT, Missing}}(cov_vector_result) 369 replace!(beta_cov_results, NaN=>missing, Inf=>missing, -Inf=>missing) 370 end 371 end 372 373 if method == :regression || method == :both 374 # Perform linear regression for all assets against the benchmark 375 # Model: centered_matrix (asset returns) = intercept + beta * benchmark_returns + error 376 377 # Prepare data for regression: benchmark_returns as the predictor (X) and centered_matrix as the response (Y) 378 # NOW using benchmark_returns_to_use 379 # Add intercept column to benchmark_returns vector - NOW using benchmark_returns_to_use 380 X = hcat(fill(one(DFT), size(returns_to_use_df, 1)), benchmark_returns_to_use) 381 Y = centered_matrix 382 383 # Perform linear regression using the backslash operator for least squares 384 try 385 beta_matrix = X \ Y 386 # Extract Beta coefficients (second row of the coefficients matrix) 387 # Ensure the result can hold missing if any calculation resulted in NaN/Inf 388 beta_reg_results_temp_vec = vec(beta_matrix[2, :]) 389 # Convert to a vector that can hold missing and replace NaNs/Infs 390 beta_reg_results = Vector{Union{DFT, Missing}}(beta_reg_results_temp_vec) 391 replace!(beta_reg_results, NaN=>missing, Inf=>missing, -Inf=>missing) 392 393 catch e 394 @warn "Could not calculate Beta by Regression" exception = e 395 # Fill with missing, ensuring the type can handle it 396 beta_reg_results = Vector{Union{DFT, Missing}}(fill(missing, length(asset_names))) 397 end 398 end 399 400 # Collect results 401 for (i, asset_name) in enumerate(asset_names) 402 row_data = (Asset = asset_name,) 403 if method == :covariance || method == :both 404 # Check if beta_cov_results was calculated (i.e., method includes covariance) 405 if beta_cov_results !== nothing 406 # Get the value - it can be Missing 407 beta_cov_val = beta_cov_results[i] 408 row_data = merge(row_data, (Beta_Covariance = beta_cov_val,)) 409 else 410 # This case should ideally not be reached if initialization and fill(missing) are correct, 411 # but kept for safety. 412 row_data = merge(row_data, (Beta_Covariance = missing,)) 413 end 414 end 415 if method == :regression || method == :both 416 # Check if beta_reg_results was calculated (i.e., method includes regression) 417 if beta_reg_results !== nothing 418 # Get the value - it can be Missing 419 beta_reg_val = beta_reg_results[i] 420 row_data = merge(row_data, (Beta_Regression = beta_reg_val,)) 421 else 422 # This case should ideally not be reached if initialization and fill(missing) are correct, 423 # but kept for safety. 424 row_data = merge(row_data, (Beta_Regression = missing,)) 425 end 426 end 427 push!(results_data, row_data) 428 end 429 430 # Construct the final DataFrame with only relevant columns 431 results_df = DataFrame(results_data, result_cols) 432 433 return results_df 434 end 435 436 export beta_indicator 437 438 439