/ FeatureSelection / src / beta.jl
beta.jl
  1  using StatsBase: cov, var
  2  using GLM
  3  using Statistics: mean
  4  using Strategies: Strategies as st
  5  using .st.Misc: Option
  6  using .st.Data: Data as da, DataFrame
  7  using .da.DataFrames: DataFrame, metadata!, deletemetadata!, names, metadata
  8  using .st: TimeFrame, DFT, @tf_str
  9  using .st.coll: _flatten_noempty!, raw, flatten
 10  using .st.Exchanges: tickers
 11  using Processing.Alignments: trim!, empty_unaligned!
 12  using GLM: @formula
 13  using LinearAlgebra: diag
 14  
 15  @doc """
 16      calculate_beta_covariance(stock_returns, market_returns)
 17  
 18  Calculate the Beta of a stock/asset relative to a market benchmark using the covariance and variance method.
 19  
 20  Beta = Covariance(stock returns, market returns) / Variance(market returns)
 21  
 22  # Arguments
 23  - `stock_returns::Vector{DFT}`: Vector of historical returns for the stock/asset.
 24  - `market_returns::Vector{DFT}`: Vector of historical returns for the market benchmark.
 25  
 26  # Returns
 27  - `DFT`: The calculated Beta value.
 28  
 29  # Throws
 30  - `Error`: If the input vectors have different lengths, insufficient data, or market variance is zero.
 31  """
 32  function calculate_beta_covariance(stock_returns::Vector{DFT}, market_returns::Vector{DFT})::DFT
 33      if length(stock_returns) != length(market_returns)
 34          error("Input return series must have the same length")
 35      end
 36  
 37      if length(stock_returns) < 2 # Need at least 2 data points for covariance/variance
 38           error("Not enough data points to calculate beta")
 39      end
 40  
 41      cov_returns = cov(stock_returns, market_returns)
 42      var_market = var(market_returns)
 43  
 44      if abs(var_market) < eps(DFT) # Check if variance is close to zero
 45          error("Cannot calculate beta: Market variance is zero or near zero")
 46      end
 47  
 48      beta = cov_returns / var_market
 49  
 50      return beta
 51  end
 52  
 53  @doc """
 54      calculate_beta_regression(stock_returns, market_returns)
 55  
 56  Calculate the Beta of a stock/asset relative to a market benchmark using linear regression.
 57  
 58  Beta is the slope coefficient (β) from the linear regression model:
 59  stock_returns = α + β * market_returns + ε
 60  
 61  # Arguments
 62  - `stock_returns::Vector{DFT}`: Vector of historical returns for the stock/asset.
 63  - `market_returns::Vector{DFT}`: Vector of historical returns for the market benchmark.
 64  
 65  # Returns
 66  - `DFT`: The calculated Beta value (the slope coefficient).
 67  
 68  # Throws
 69  - `Error`: If the input vectors have different lengths or insufficient data.
 70  """
 71  function calculate_beta_regression(stock_returns::Vector{DFT}, market_returns::Vector{DFT})::DFT
 72      if length(stock_returns) != length(market_returns)
 73          error("Input return series must have the same length")
 74      end
 75  
 76       if length(stock_returns) < 2 # Need at least 2 data points for regression
 77            error("Not enough data points to calculate beta")
 78       end
 79  
 80      # Create a DataFrame for GLM
 81      data = DataFrame(stock_returns=stock_returns, market_returns=market_returns)
 82  
 83      # Perform linear regression: stock_returns ~ market_returns
 84      model = lm(@formula(stock_returns ~ market_returns), data)
 85  
 86      # Extract the coefficient for market_returns (which is Beta)
 87      # The coefficients are in the order of the formula: intercept, market_returns
 88      beta = GLM.coef(model)[2]
 89  
 90      return beta
 91  end
 92  
 93  @doc """
 94      beta_indicator(s::st.Strategy, tf=s.timeframe; benchmark = :top_asset, min_vol = 1e6, method = :both)
 95  
 96  Calculate the Beta for all assets in a strategy's universe relative to a specified benchmark.
 97  
 98  The benchmark can be:
 99  - A Symbol: `:top_asset` (single asset with highest volume) or `:top_5_percent` (aggregate of top 5% by volume).
100  - A String: The name of a specific asset from the strategy's universe.
101  - A DataFrame: An external DataFrame with a timestamp column (first) and numerical return column (second).
102  
103  # Arguments
104  - `s::st.Strategy`: The strategy object containing the universe of assets.
105  - `tf::TimeFrame`: The timeframe for the asset data (defaults to strategy's timeframe).
106  - `benchmark::Union{Symbol, String, DataFrame}`: Specifies the benchmark. See description above. Defaults to `:top_asset`.
107  - `min_vol::DFT`: The minimum volume required for an asset to be considered for `:top_asset` or `:top_5_percent` benchmarks. Defaults to 1e6.
108  - `method::Symbol`: The method to use for Beta calculation. Accepts `:covariance`, `:regression`, or `:both`. Defaults to `:both`. Defaults to `:covariance`.
109  - `tail::Option{Int}`: The number of data points to use for the Beta calculation. If `nothing`, all data points are used. If a positive integer, the last `tail` data points are used.
110  
111  # Returns
112  - `DataFrame`: A DataFrame with columns `:Asset`, and either `:Beta_Covariance`, `:Beta_Regression`, or both, depending on the `method` argument.
113               Returns an empty DataFrame if insufficient data or assets are available, or if the benchmark cannot be determined.
114  """
115  function beta_indicator(s::st.Strategy, tf=s.timeframe; benchmark::Union{Symbol, String, DataFrame} = :top_asset, min_vol::DFT=1e6, method::Symbol = :covariance, tail::Option{Int} = nothing)::DataFrame
116      # Get universe data
117      universe_data = st.universe(s)
118  
119      local benchmark_returns::Vector{DFT}
120      local benchmark_name = "benchmark"
121      local centered_df::DataFrame
122      local asset_names::Vector{String}
123  
124      # Handle external DataFrame benchmark before flattening
125      if typeof(benchmark) <: DataFrame
126          external_benchmark_df = copy(benchmark) # necessary to ensure metadata uniqueness
127          if size(external_benchmark_df, 2) < 2
128              error("External benchmark DataFrame must have at least two columns.")
129          end
130          # Ensure the external benchmark DataFrame has the correct timeframe metadata
131          da.timeframe!(external_benchmark_df, tf)
132  
133          # Add "asset_instance" metadata if not present. Use a unique temporary key.
134          metadata!(external_benchmark_df, "asset_instance", benchmark_name) # Store unique key
135  
136          # Flatten the universe data
137          flattened_data = st.coll.flatten(universe_data; noempty=true)
138  
139          # Get initial list of DataFrames before adding benchmark
140          local initial_dfs = get(flattened_data, tf, DataFrame[])
141  
142          push!(@lget!(flattened_data, tf, DataFrame[]), external_benchmark_df)
143  
144          # Now, apply center_data to the combined flattened data
145          try
146              # center_data will handle the alignment internally
147              (trimmed_data, v) = center_data(flattened_data, tf; ratio_func=ratio!)
148  
149              local benchmark_df_trimmed::Option{DataFrame} = nothing
150              local benchmark_v_column_idx::Option{Int} = nothing # Store the column index in v for the benchmark
151              local asset_names_and_v_indices = Tuple{String, Int}[] # To store (asset_name, v_column_index) pairs for assets
152  
153              local current_v_col_idx = 1 # Counter for the current column index in the v matrix
154  
155              for df_in_trimmed in trimmed_data[tf]
156                  if !isempty(df_in_trimmed)
157                      asset_meta_name = raw(metadata(df_in_trimmed, "asset_instance"))
158                      if asset_meta_name == benchmark_name
159                         benchmark_df_trimmed = df_in_trimmed
160                         benchmark_v_column_idx = current_v_col_idx # Record the column index in v
161                      else
162                         # This is an asset DataFrame, store its name and its column index in v
163                         push!(asset_names_and_v_indices, (asset_meta_name, current_v_col_idx))
164                      end
165                      current_v_col_idx += 1 # Increment for the next non-empty DataFrame
166                  end
167              end
168  
169              if isnothing(benchmark_df_trimmed) || isnothing(benchmark_v_column_idx)
170                   @warn "External benchmark DataFrame (with metadata key '$benchmark_name') not found in trimmed data after centering." asset_metadata=[(raw(metadata(d, "asset_instance", "<missing>")), names(d)) for d in trimmed_data[tf] if !isempty(d)]
171                   return DataFrame()
172              end
173  
174              # benchmark_v_column_idx now correctly points to the benchmark's column in v
175              benchmark_returns = v[:, benchmark_v_column_idx]
176              @debug "Using aligned external DataFrame (key: '$benchmark_name') as benchmark."
177  
178              # Separate the collected asset names and indices
179              asset_names = [pair[1] for pair in asset_names_and_v_indices]
180              cols_to_keep_v_indices = [pair[2] for pair in asset_names_and_v_indices]
181  
182              if isempty(asset_names) || size(v, 1) < 2 || isempty(cols_to_keep_v_indices)
183                   @warn "Not enough aligned asset data after centering and excluding external benchmark, or no assets left."
184                   return DataFrame()
185              end
186  
187              # Ensure cols_to_keep_v_indices are valid for v's dimensions
188              if any(idx -> idx < 1 || idx > size(v, 2), cols_to_keep_v_indices)
189                   @error "Calculated column indices for assets are out of bounds for the centered matrix." indices=cols_to_keep_v_indices matrix_size=size(v) asset_info=asset_names_and_v_indices benchmark_name=benchmark_name
190                   return DataFrame()
191              end
192  
193              centered_v_assets = v[:, cols_to_keep_v_indices]
194              centered_df = DataFrame(centered_v_assets, asset_names) # Create DataFrame from centered asset data
195  
196          catch e
197              @warn "Centering combined data with external benchmark failed." exception = e
198              return DataFrame() # Return empty DataFrame on centering failure
199          end
200  
201      else
202          # No external DataFrame, proceed with existing logic using flattened universe data
203          # Get flattened data
204          flattened_data = st.coll.flatten(universe_data; noempty=true)
205  
206          # Apply center_data to the flattened universe data
207          try
208              (trimmed_data, v) = center_data(flattened_data, tf; ratio_func=ratio!) # Using ratio! directly, assuming it's in scope
209              asset_names = [raw(metadata(df, "asset_instance")) for df in trimmed_data[tf] if !isempty(df)]
210              centered_df = DataFrame(v, asset_names)
211  
212              if isempty(asset_names) || size(centered_df, 1) < 2
213                   @warn "Not enough data or assets available to calculate beta after centering."
214                   return DataFrame()
215              end
216  
217          catch e
218              @warn "Centering universe data failed." exception = e
219              return DataFrame()
220          end
221  
222          # Determine benchmark returns based on symbol or string from the centered asset data
223  
224          # Get volume-sorted asset names that are also in the centered data
225          all_volume_sorted_assets = tickers(st.getexchange!(s.exchange), s.qc; min_vol=min_vol, as_vec=true)
226          volume_sorted_assets = [asset for asset in all_volume_sorted_assets if asset in asset_names]
227  
228          if typeof(benchmark) <: String
229              # Use specific asset as benchmark
230               if !(benchmark in asset_names)
231                    @warn "Specified benchmark asset \"$(benchmark)\" is not in the strategy's universe or centered data."
232                    return DataFrame()
233               end
234               benchmark_name = benchmark
235               benchmark_returns = centered_df[:, benchmark_name]
236               @debug "Using specific asset \"$(benchmark)\" as benchmark."
237  
238          elseif typeof(benchmark) <: Symbol
239              if benchmark == :top_asset || benchmark == :top_5_percent
240  
241                  if isempty(volume_sorted_assets)
242                      @warn "No assets meet the minimum volume requirement or are not in the data to determine $(benchmark) benchmark."
243                      return DataFrame()
244                  end
245  
246                  if benchmark == :top_asset
247                      benchmark_name = last(volume_sorted_assets)
248                      if !(benchmark_name in names(centered_df))
249                           @warn "Top asset \"$(benchmark_name)\" not in centered data columns."
250                           return DataFrame()
251                      end
252                      benchmark_returns = centered_df[:, benchmark_name]
253                      @debug "Using top asset \"$(benchmark_name)\" as benchmark."
254                  elseif benchmark == :top_5_percent
255                      # Select top 5% assets (at least one)
256                      num_top_5_percent = max(1, floor(Int, length(volume_sorted_assets) * 0.05))
257                      benchmark_assets = volume_sorted_assets[1:min(num_top_5_percent, end)]
258  
259                      if isempty(benchmark_assets)
260                          @warn "Could not select top 5% assets for benchmark."
261                          return DataFrame()
262                      end
263  
264                      # Calculate aggregate returns (mean of returns across selected assets)
265                      # Ensure all benchmark_assets are in centered_df columns
266                      valid_benchmark_assets = [asset for asset in benchmark_assets if asset in names(centered_df)]
267                      if isempty(valid_benchmark_assets)
268                           @warn "Selected top 5% assets not found in centered data columns."
269                           return DataFrame()
270                      end
271  
272                      benchmark_returns = mean(Matrix(@view centered_df[:, valid_benchmark_assets]); dims=2)[:, 1]
273                      benchmark_name = "Top $(length(valid_benchmark_assets)) Assets Aggregate"
274                      @debug "Using aggregate of top $(length(valid_benchmark_assets)) assets as benchmark: $(valid_benchmark_assets)"
275                  end
276              else
277                  error("Invalid benchmark symbol: $(benchmark). Must be :top_asset or :top_5_percent.")
278              end
279          else
280              # This case should not be reached due to the initial type check,
281              # but included for completeness.
282               error("Invalid benchmark type after initial check: $(typeof(benchmark)).")
283          end
284      end # end of if/else block handling benchmark types
285  
286      # Check if benchmark_returns were successfully determined and match the length of centered data
287      if !(@isdefined benchmark_returns) || isempty(benchmark_returns)
288           @warn "Benchmark returns could not be determined."
289           return DataFrame()
290      end
291  
292       if length(benchmark_returns) != size(centered_df, 1)
293            @warn "Aligned benchmark data length ($(length(benchmark_returns))) does not match aligned asset data length ($(size(centered_df, 1))). This indicates an issue with alignment or data processing."
294            return DataFrame()
295       end
296  
297      # --- New code for handling 'tail' argument ---
298      local data_length = size(centered_df, 1)
299      local returns_to_use_df::DataFrame
300      local benchmark_returns_to_use::Vector{DFT}
301  
302      if tail !== nothing
303          if tail <= 0
304              @warn "Invalid 'tail' value provided. Must be a positive integer." tail=tail
305              return DataFrame()
306          end
307          if tail > data_length
308              @warn "'tail' value ($(tail)) is greater than the available data length ($(data_length)). Using all available data." tail=tail data_length=data_length
309              returns_to_use_df = centered_df
310              benchmark_returns_to_use = benchmark_returns
311          else
312              @debug "Using last $(tail) data points for Beta calculation."
313              returns_to_use_df = centered_df[end-tail+1:end, :]
314              benchmark_returns_to_use = benchmark_returns[end-tail+1:end]
315          end
316      else
317          # If tail is not provided, use all available data
318          returns_to_use_df = centered_df
319          benchmark_returns_to_use = benchmark_returns
320      end
321  
322      # Ensure sufficient data points are available AFTER applying the tail (if any)
323      if size(returns_to_use_df, 1) < 2
324           @warn "Not enough data points to calculate beta after applying 'tail' or centering." data_points=size(returns_to_use_df, 1) minimum_required=2
325           return DataFrame()
326      end
327      # --- End of new code for handling 'tail' argument ---
328  
329      # Determine the columns for the results DataFrame
330      local result_cols::Vector{Symbol}
331      if method == :covariance
332          result_cols = [:Asset, :Beta_Covariance]
333      elseif method == :regression
334          result_cols = [:Asset, :Beta_Regression]
335      elseif method == :both
336          result_cols = [:Asset, :Beta_Covariance, :Beta_Regression]
337      else
338          error("Invalid method: $(method). Must be :covariance, :regression, or :both.")
339      end
340  
341      # Prepare to collect results
342      results_data = []
343  
344      # Convert centered_df to matrix for efficient calculations - NOW using returns_to_use_df
345      centered_matrix = Matrix(returns_to_use_df)
346  
347      # Initialize result variables - ensure type can handle missing
348      local beta_cov_results::Union{Vector{<:Union{DFT, Missing}}, Nothing} = nothing
349      local beta_reg_results::Union{Vector{<:Union{DFT, Missing}}, Nothing} = nothing
350  
351      # Calculate betas based on the specified method using vectorized operations
352      if method == :covariance || method == :both
353          # Calculate covariance vector: cov(asset_returns, benchmark_returns) for all assets
354          # NOW using benchmark_returns_to_use
355          cov_vector = cov(centered_matrix, benchmark_returns_to_use)
356  
357          # Calculate variance of benchmark returns - NOW using benchmark_returns_to_use
358          var_market = var(benchmark_returns_to_use)
359  
360          if abs(var_market) < eps(DFT)
361              @warn "Cannot calculate Beta by Covariance: Market variance is zero or near zero."
362              # Fill with missing, ensuring the type can handle it
363              beta_cov_results = Vector{Union{DFT, Missing}}(fill(missing, length(asset_names)))
364          else
365              # Beta_Covariance = Covariance(asset returns, benchmark returns) / Variance(market returns)
366              # Ensure the result vector can hold missing in case of Inf/NaN
367              cov_vector_result = vec(cov_vector ./ var_market)
368              beta_cov_results = Vector{Union{DFT, Missing}}(cov_vector_result)
369              replace!(beta_cov_results, NaN=>missing, Inf=>missing, -Inf=>missing)
370          end
371      end
372  
373      if method == :regression || method == :both
374          # Perform linear regression for all assets against the benchmark
375          # Model: centered_matrix (asset returns) = intercept + beta * benchmark_returns + error
376  
377          # Prepare data for regression: benchmark_returns as the predictor (X) and centered_matrix as the response (Y)
378          # NOW using benchmark_returns_to_use
379          # Add intercept column to benchmark_returns vector - NOW using benchmark_returns_to_use
380          X = hcat(fill(one(DFT), size(returns_to_use_df, 1)), benchmark_returns_to_use)
381          Y = centered_matrix
382  
383          # Perform linear regression using the backslash operator for least squares
384          try
385              beta_matrix = X \ Y
386              # Extract Beta coefficients (second row of the coefficients matrix)
387              # Ensure the result can hold missing if any calculation resulted in NaN/Inf
388              beta_reg_results_temp_vec = vec(beta_matrix[2, :])
389              # Convert to a vector that can hold missing and replace NaNs/Infs
390              beta_reg_results = Vector{Union{DFT, Missing}}(beta_reg_results_temp_vec)
391              replace!(beta_reg_results, NaN=>missing, Inf=>missing, -Inf=>missing)
392  
393          catch e
394              @warn "Could not calculate Beta by Regression" exception = e
395               # Fill with missing, ensuring the type can handle it
396              beta_reg_results = Vector{Union{DFT, Missing}}(fill(missing, length(asset_names)))
397          end
398      end
399  
400      # Collect results
401      for (i, asset_name) in enumerate(asset_names)
402          row_data = (Asset = asset_name,)
403          if method == :covariance || method == :both
404              # Check if beta_cov_results was calculated (i.e., method includes covariance)
405              if beta_cov_results !== nothing
406                  # Get the value - it can be Missing
407                  beta_cov_val = beta_cov_results[i]
408                  row_data = merge(row_data, (Beta_Covariance = beta_cov_val,))
409              else
410                   # This case should ideally not be reached if initialization and fill(missing) are correct,
411                   # but kept for safety.
412                   row_data = merge(row_data, (Beta_Covariance = missing,))
413              end
414          end
415          if method == :regression || method == :both
416              # Check if beta_reg_results was calculated (i.e., method includes regression)
417              if beta_reg_results !== nothing
418                  # Get the value - it can be Missing
419                  beta_reg_val = beta_reg_results[i]
420                  row_data = merge(row_data, (Beta_Regression = beta_reg_val,))
421              else
422                   # This case should ideally not be reached if initialization and fill(missing) are correct,
423                   # but kept for safety.
424                   row_data = merge(row_data, (Beta_Regression = missing,))
425              end
426          end
427          push!(results_data, row_data)
428      end
429  
430      # Construct the final DataFrame with only relevant columns
431      results_df = DataFrame(results_data, result_cols)
432  
433      return results_df
434  end
435  
436  export beta_indicator
437  
438  
439