/ portfolio_analyzer.py
portfolio_analyzer.py
1 """ 2 Portfolio Analyzer Module 3 4 AI-powered portfolio risk analysis using statistical and ML methods. 5 """ 6 7 import numpy as np 8 from typing import List, Dict, Any 9 from scipy import stats 10 from scipy.optimize import minimize 11 12 13 class PortfolioAnalyzer: 14 """Analyzes portfolio risk and provides optimization recommendations.""" 15 16 def __init__(self): 17 """Initialize the portfolio analyzer.""" 18 self.risk_free_rate = 0.05 # Annual risk-free rate (5%) 19 self.trading_days = 252 20 21 def analyze( 22 self, 23 symbols: List[str], 24 weights: np.ndarray, 25 horizon: int = 252, 26 confidence: float = 0.95 27 ) -> Dict[str, Any]: 28 """ 29 Analyze portfolio risk metrics. 30 31 Args: 32 symbols: List of stock symbols 33 weights: Portfolio weights 34 horizon: Investment horizon in trading days 35 confidence: Confidence level for VaR calculation 36 37 Returns: 38 Dictionary of risk metrics 39 """ 40 # Generate synthetic returns for demonstration 41 # In production, this would use real market data 42 returns = self._get_returns(symbols) 43 44 # Calculate portfolio returns 45 portfolio_returns = np.dot(returns, weights) 46 47 # Basic statistics 48 mean_return = np.mean(portfolio_returns) 49 volatility = np.std(portfolio_returns) 50 annualized_return = mean_return * self.trading_days 51 annualized_vol = volatility * np.sqrt(self.trading_days) 52 53 # Risk metrics 54 var_daily = self._calculate_var(portfolio_returns, confidence) 55 var_horizon = var_daily * np.sqrt(horizon) 56 cvar = self._calculate_cvar(portfolio_returns, confidence) 57 58 # Sharpe ratio 59 sharpe = (annualized_return - self.risk_free_rate) / annualized_vol 60 61 # Maximum drawdown 62 max_drawdown = self._calculate_max_drawdown(portfolio_returns) 63 64 # Beta (vs market) 65 market_returns = self._get_market_returns() 66 beta = self._calculate_beta(portfolio_returns, market_returns) 67 68 # Sortino ratio (downside risk) 69 sortino = self._calculate_sortino(portfolio_returns) 70 71 # Correlation matrix 72 correlation = self._calculate_correlation(returns, symbols) 73 74 return { 75 "return_metrics": { 76 "daily_mean_return": round(mean_return * 100, 4), 77 "annualized_return": round(annualized_return * 100, 2), 78 "cumulative_return_estimate": round( 79 (1 + mean_return) ** horizon - 1, 4 80 ) * 100 81 }, 82 "risk_metrics": { 83 "daily_volatility": round(volatility * 100, 4), 84 "annualized_volatility": round(annualized_vol * 100, 2), 85 "var_daily_95": round(var_daily * 100, 4), 86 "var_horizon": round(var_horizon * 100, 2), 87 "cvar_95": round(cvar * 100, 4), 88 "max_drawdown": round(max_drawdown * 100, 2) 89 }, 90 "performance_ratios": { 91 "sharpe_ratio": round(sharpe, 3), 92 "sortino_ratio": round(sortino, 3), 93 "beta": round(beta, 3), 94 "alpha": round(annualized_return - beta * 0.10, 4) # Assuming 10% market return 95 }, 96 "diversification": { 97 "effective_n": round(1 / np.sum(weights ** 2), 2), 98 "herfindahl_index": round(np.sum(weights ** 2), 4) 99 }, 100 "correlation_matrix": correlation, 101 "risk_contribution": self._calculate_risk_contribution( 102 returns, weights, symbols 103 ) 104 } 105 106 def optimize( 107 self, 108 symbols: List[str], 109 current_weights: np.ndarray, 110 horizon: int = 252 111 ) -> Dict[str, Any]: 112 """ 113 Generate portfolio optimization recommendations. 114 115 Args: 116 symbols: List of stock symbols 117 current_weights: Current portfolio weights 118 horizon: Investment horizon 119 120 Returns: 121 Optimization recommendations 122 """ 123 returns = self._get_returns(symbols) 124 125 # Calculate expected returns and covariance 126 expected_returns = np.mean(returns, axis=0) * self.trading_days 127 cov_matrix = np.cov(returns.T) * self.trading_days 128 129 # Maximum Sharpe ratio portfolio 130 max_sharpe = self._optimize_sharpe(expected_returns, cov_matrix) 131 132 # Minimum variance portfolio 133 min_var = self._optimize_min_variance(cov_matrix) 134 135 # Risk parity portfolio 136 risk_parity = self._optimize_risk_parity(cov_matrix) 137 138 return { 139 "current_portfolio": { 140 "weights": {s: round(w, 4) for s, w in zip(symbols, current_weights)}, 141 "expected_return": round( 142 np.dot(expected_returns, current_weights) * 100, 2 143 ), 144 "volatility": round( 145 np.sqrt(np.dot(current_weights, np.dot(cov_matrix, current_weights))) * 100, 2 146 ) 147 }, 148 "max_sharpe_portfolio": { 149 "weights": {s: round(w, 4) for s, w in zip(symbols, max_sharpe)}, 150 "expected_return": round( 151 np.dot(expected_returns, max_sharpe) * 100, 2 152 ), 153 "volatility": round( 154 np.sqrt(np.dot(max_sharpe, np.dot(cov_matrix, max_sharpe))) * 100, 2 155 ), 156 "strategy": "Maximizes risk-adjusted returns (Sharpe ratio)" 157 }, 158 "min_variance_portfolio": { 159 "weights": {s: round(w, 4) for s, w in zip(symbols, min_var)}, 160 "expected_return": round( 161 np.dot(expected_returns, min_var) * 100, 2 162 ), 163 "volatility": round( 164 np.sqrt(np.dot(min_var, np.dot(cov_matrix, min_var))) * 100, 2 165 ), 166 "strategy": "Minimizes overall portfolio volatility" 167 }, 168 "risk_parity_portfolio": { 169 "weights": {s: round(w, 4) for s, w in zip(symbols, risk_parity)}, 170 "expected_return": round( 171 np.dot(expected_returns, risk_parity) * 100, 2 172 ), 173 "volatility": round( 174 np.sqrt(np.dot(risk_parity, np.dot(cov_matrix, risk_parity))) * 100, 2 175 ), 176 "strategy": "Equal risk contribution from each asset" 177 }, 178 "recommendation": self._generate_recommendation( 179 current_weights, max_sharpe, min_var, risk_parity, symbols 180 ) 181 } 182 183 def _get_returns(self, symbols: List[str]) -> np.ndarray: 184 """Generate synthetic returns for demonstration.""" 185 np.random.seed(42) # For reproducibility 186 n_days = 252 187 n_assets = len(symbols) 188 189 # Base parameters per asset type 190 params = { 191 "tech": (0.0008, 0.02), # Higher return, higher vol 192 "stable": (0.0003, 0.008), # Lower return, lower vol 193 "default": (0.0005, 0.015) # Moderate 194 } 195 196 returns = np.zeros((n_days, n_assets)) 197 for i, symbol in enumerate(symbols): 198 if symbol.upper() in ["AAPL", "GOOGL", "MSFT", "NVDA", "META"]: 199 mu, sigma = params["tech"] 200 elif symbol.upper() in ["JNJ", "PG", "KO", "PEP", "WMT"]: 201 mu, sigma = params["stable"] 202 else: 203 mu, sigma = params["default"] 204 205 returns[:, i] = np.random.normal(mu, sigma, n_days) 206 207 # Add some correlation 208 correlation_factor = 0.3 209 market_factor = np.random.normal(0.0004, 0.012, n_days) 210 for i in range(n_assets): 211 returns[:, i] = ( 212 correlation_factor * market_factor + 213 (1 - correlation_factor) * returns[:, i] 214 ) 215 216 return returns 217 218 def _get_market_returns(self) -> np.ndarray: 219 """Get market returns (S&P 500 proxy).""" 220 np.random.seed(123) 221 return np.random.normal(0.0004, 0.012, 252) 222 223 def _calculate_var(self, returns: np.ndarray, confidence: float) -> float: 224 """Calculate Value at Risk.""" 225 return -np.percentile(returns, (1 - confidence) * 100) 226 227 def _calculate_cvar(self, returns: np.ndarray, confidence: float) -> float: 228 """Calculate Conditional Value at Risk (Expected Shortfall).""" 229 var = self._calculate_var(returns, confidence) 230 return -np.mean(returns[returns <= -var]) 231 232 def _calculate_max_drawdown(self, returns: np.ndarray) -> float: 233 """Calculate maximum drawdown.""" 234 cumulative = np.cumprod(1 + returns) 235 running_max = np.maximum.accumulate(cumulative) 236 drawdowns = (running_max - cumulative) / running_max 237 return np.max(drawdowns) 238 239 def _calculate_beta( 240 self, portfolio_returns: np.ndarray, market_returns: np.ndarray 241 ) -> float: 242 """Calculate portfolio beta.""" 243 covariance = np.cov(portfolio_returns, market_returns)[0, 1] 244 market_variance = np.var(market_returns) 245 return covariance / market_variance if market_variance > 0 else 1.0 246 247 def _calculate_sortino(self, returns: np.ndarray) -> float: 248 """Calculate Sortino ratio.""" 249 mean_return = np.mean(returns) * self.trading_days 250 negative_returns = returns[returns < 0] 251 downside_std = np.std(negative_returns) * np.sqrt(self.trading_days) 252 if downside_std == 0: 253 return 0.0 254 return (mean_return - self.risk_free_rate) / downside_std 255 256 def _calculate_correlation( 257 self, returns: np.ndarray, symbols: List[str] 258 ) -> Dict[str, Dict[str, float]]: 259 """Calculate correlation matrix.""" 260 corr_matrix = np.corrcoef(returns.T) 261 result = {} 262 for i, s1 in enumerate(symbols): 263 result[s1] = {} 264 for j, s2 in enumerate(symbols): 265 result[s1][s2] = round(corr_matrix[i, j], 3) 266 return result 267 268 def _calculate_risk_contribution( 269 self, returns: np.ndarray, weights: np.ndarray, symbols: List[str] 270 ) -> Dict[str, float]: 271 """Calculate risk contribution of each asset.""" 272 cov_matrix = np.cov(returns.T) 273 portfolio_vol = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights))) 274 marginal_contrib = np.dot(cov_matrix, weights) / portfolio_vol 275 risk_contrib = weights * marginal_contrib 276 risk_contrib_pct = risk_contrib / np.sum(risk_contrib) 277 return {s: round(rc * 100, 2) for s, rc in zip(symbols, risk_contrib_pct)} 278 279 def _optimize_sharpe( 280 self, expected_returns: np.ndarray, cov_matrix: np.ndarray 281 ) -> np.ndarray: 282 """Find maximum Sharpe ratio portfolio.""" 283 n = len(expected_returns) 284 285 def neg_sharpe(weights): 286 ret = np.dot(expected_returns, weights) 287 vol = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights))) 288 return -(ret - self.risk_free_rate) / vol 289 290 constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}] 291 bounds = [(0, 1) for _ in range(n)] 292 init_weights = np.ones(n) / n 293 294 result = minimize( 295 neg_sharpe, init_weights, 296 method="SLSQP", bounds=bounds, constraints=constraints 297 ) 298 return result.x 299 300 def _optimize_min_variance(self, cov_matrix: np.ndarray) -> np.ndarray: 301 """Find minimum variance portfolio.""" 302 n = cov_matrix.shape[0] 303 304 def portfolio_variance(weights): 305 return np.dot(weights, np.dot(cov_matrix, weights)) 306 307 constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}] 308 bounds = [(0, 1) for _ in range(n)] 309 init_weights = np.ones(n) / n 310 311 result = minimize( 312 portfolio_variance, init_weights, 313 method="SLSQP", bounds=bounds, constraints=constraints 314 ) 315 return result.x 316 317 def _optimize_risk_parity(self, cov_matrix: np.ndarray) -> np.ndarray: 318 """Find risk parity portfolio.""" 319 n = cov_matrix.shape[0] 320 target_risk = 1 / n 321 322 def risk_parity_obj(weights): 323 portfolio_vol = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights))) 324 marginal_contrib = np.dot(cov_matrix, weights) / portfolio_vol 325 risk_contrib = weights * marginal_contrib 326 risk_contrib_pct = risk_contrib / np.sum(risk_contrib) 327 return np.sum((risk_contrib_pct - target_risk) ** 2) 328 329 constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}] 330 bounds = [(0.01, 1) for _ in range(n)] 331 init_weights = np.ones(n) / n 332 333 result = minimize( 334 risk_parity_obj, init_weights, 335 method="SLSQP", bounds=bounds, constraints=constraints 336 ) 337 return result.x 338 339 def _generate_recommendation( 340 self, 341 current: np.ndarray, 342 max_sharpe: np.ndarray, 343 min_var: np.ndarray, 344 risk_parity: np.ndarray, 345 symbols: List[str] 346 ) -> Dict[str, Any]: 347 """Generate portfolio rebalancing recommendation.""" 348 # Find biggest changes needed 349 changes_sharpe = max_sharpe - current 350 changes_min_var = min_var - current 351 352 largest_increase = symbols[np.argmax(changes_sharpe)] 353 largest_decrease = symbols[np.argmin(changes_sharpe)] 354 355 return { 356 "summary": ( 357 f"Consider increasing allocation to {largest_increase} " 358 f"and reducing {largest_decrease} for better risk-adjusted returns." 359 ), 360 "confidence": "medium", 361 "suggested_rebalancing": { 362 "increase": { 363 largest_increase: round(changes_sharpe[np.argmax(changes_sharpe)] * 100, 1) 364 }, 365 "decrease": { 366 largest_decrease: round(changes_sharpe[np.argmin(changes_sharpe)] * 100, 1) 367 } 368 }, 369 "notes": [ 370 "Recommendations based on historical data patterns", 371 "Past performance does not guarantee future results", 372 "Consider tax implications before rebalancing" 373 ] 374 }