/ portfolio_analyzer.py
portfolio_analyzer.py
  1  """
  2  Portfolio Analyzer Module
  3  
  4  AI-powered portfolio risk analysis using statistical and ML methods.
  5  """
  6  
  7  import numpy as np
  8  from typing import List, Dict, Any
  9  from scipy import stats
 10  from scipy.optimize import minimize
 11  
 12  
 13  class PortfolioAnalyzer:
 14      """Analyzes portfolio risk and provides optimization recommendations."""
 15  
 16      def __init__(self):
 17          """Initialize the portfolio analyzer."""
 18          self.risk_free_rate = 0.05  # Annual risk-free rate (5%)
 19          self.trading_days = 252
 20  
 21      def analyze(
 22          self,
 23          symbols: List[str],
 24          weights: np.ndarray,
 25          horizon: int = 252,
 26          confidence: float = 0.95
 27      ) -> Dict[str, Any]:
 28          """
 29          Analyze portfolio risk metrics.
 30  
 31          Args:
 32              symbols: List of stock symbols
 33              weights: Portfolio weights
 34              horizon: Investment horizon in trading days
 35              confidence: Confidence level for VaR calculation
 36  
 37          Returns:
 38              Dictionary of risk metrics
 39          """
 40          # Generate synthetic returns for demonstration
 41          # In production, this would use real market data
 42          returns = self._get_returns(symbols)
 43  
 44          # Calculate portfolio returns
 45          portfolio_returns = np.dot(returns, weights)
 46  
 47          # Basic statistics
 48          mean_return = np.mean(portfolio_returns)
 49          volatility = np.std(portfolio_returns)
 50          annualized_return = mean_return * self.trading_days
 51          annualized_vol = volatility * np.sqrt(self.trading_days)
 52  
 53          # Risk metrics
 54          var_daily = self._calculate_var(portfolio_returns, confidence)
 55          var_horizon = var_daily * np.sqrt(horizon)
 56          cvar = self._calculate_cvar(portfolio_returns, confidence)
 57  
 58          # Sharpe ratio
 59          sharpe = (annualized_return - self.risk_free_rate) / annualized_vol
 60  
 61          # Maximum drawdown
 62          max_drawdown = self._calculate_max_drawdown(portfolio_returns)
 63  
 64          # Beta (vs market)
 65          market_returns = self._get_market_returns()
 66          beta = self._calculate_beta(portfolio_returns, market_returns)
 67  
 68          # Sortino ratio (downside risk)
 69          sortino = self._calculate_sortino(portfolio_returns)
 70  
 71          # Correlation matrix
 72          correlation = self._calculate_correlation(returns, symbols)
 73  
 74          return {
 75              "return_metrics": {
 76                  "daily_mean_return": round(mean_return * 100, 4),
 77                  "annualized_return": round(annualized_return * 100, 2),
 78                  "cumulative_return_estimate": round(
 79                      (1 + mean_return) ** horizon - 1, 4
 80                  ) * 100
 81              },
 82              "risk_metrics": {
 83                  "daily_volatility": round(volatility * 100, 4),
 84                  "annualized_volatility": round(annualized_vol * 100, 2),
 85                  "var_daily_95": round(var_daily * 100, 4),
 86                  "var_horizon": round(var_horizon * 100, 2),
 87                  "cvar_95": round(cvar * 100, 4),
 88                  "max_drawdown": round(max_drawdown * 100, 2)
 89              },
 90              "performance_ratios": {
 91                  "sharpe_ratio": round(sharpe, 3),
 92                  "sortino_ratio": round(sortino, 3),
 93                  "beta": round(beta, 3),
 94                  "alpha": round(annualized_return - beta * 0.10, 4)  # Assuming 10% market return
 95              },
 96              "diversification": {
 97                  "effective_n": round(1 / np.sum(weights ** 2), 2),
 98                  "herfindahl_index": round(np.sum(weights ** 2), 4)
 99              },
100              "correlation_matrix": correlation,
101              "risk_contribution": self._calculate_risk_contribution(
102                  returns, weights, symbols
103              )
104          }
105  
106      def optimize(
107          self,
108          symbols: List[str],
109          current_weights: np.ndarray,
110          horizon: int = 252
111      ) -> Dict[str, Any]:
112          """
113          Generate portfolio optimization recommendations.
114  
115          Args:
116              symbols: List of stock symbols
117              current_weights: Current portfolio weights
118              horizon: Investment horizon
119  
120          Returns:
121              Optimization recommendations
122          """
123          returns = self._get_returns(symbols)
124  
125          # Calculate expected returns and covariance
126          expected_returns = np.mean(returns, axis=0) * self.trading_days
127          cov_matrix = np.cov(returns.T) * self.trading_days
128  
129          # Maximum Sharpe ratio portfolio
130          max_sharpe = self._optimize_sharpe(expected_returns, cov_matrix)
131  
132          # Minimum variance portfolio
133          min_var = self._optimize_min_variance(cov_matrix)
134  
135          # Risk parity portfolio
136          risk_parity = self._optimize_risk_parity(cov_matrix)
137  
138          return {
139              "current_portfolio": {
140                  "weights": {s: round(w, 4) for s, w in zip(symbols, current_weights)},
141                  "expected_return": round(
142                      np.dot(expected_returns, current_weights) * 100, 2
143                  ),
144                  "volatility": round(
145                      np.sqrt(np.dot(current_weights, np.dot(cov_matrix, current_weights))) * 100, 2
146                  )
147              },
148              "max_sharpe_portfolio": {
149                  "weights": {s: round(w, 4) for s, w in zip(symbols, max_sharpe)},
150                  "expected_return": round(
151                      np.dot(expected_returns, max_sharpe) * 100, 2
152                  ),
153                  "volatility": round(
154                      np.sqrt(np.dot(max_sharpe, np.dot(cov_matrix, max_sharpe))) * 100, 2
155                  ),
156                  "strategy": "Maximizes risk-adjusted returns (Sharpe ratio)"
157              },
158              "min_variance_portfolio": {
159                  "weights": {s: round(w, 4) for s, w in zip(symbols, min_var)},
160                  "expected_return": round(
161                      np.dot(expected_returns, min_var) * 100, 2
162                  ),
163                  "volatility": round(
164                      np.sqrt(np.dot(min_var, np.dot(cov_matrix, min_var))) * 100, 2
165                  ),
166                  "strategy": "Minimizes overall portfolio volatility"
167              },
168              "risk_parity_portfolio": {
169                  "weights": {s: round(w, 4) for s, w in zip(symbols, risk_parity)},
170                  "expected_return": round(
171                      np.dot(expected_returns, risk_parity) * 100, 2
172                  ),
173                  "volatility": round(
174                      np.sqrt(np.dot(risk_parity, np.dot(cov_matrix, risk_parity))) * 100, 2
175                  ),
176                  "strategy": "Equal risk contribution from each asset"
177              },
178              "recommendation": self._generate_recommendation(
179                  current_weights, max_sharpe, min_var, risk_parity, symbols
180              )
181          }
182  
183      def _get_returns(self, symbols: List[str]) -> np.ndarray:
184          """Generate synthetic returns for demonstration."""
185          np.random.seed(42)  # For reproducibility
186          n_days = 252
187          n_assets = len(symbols)
188  
189          # Base parameters per asset type
190          params = {
191              "tech": (0.0008, 0.02),  # Higher return, higher vol
192              "stable": (0.0003, 0.008),  # Lower return, lower vol
193              "default": (0.0005, 0.015)  # Moderate
194          }
195  
196          returns = np.zeros((n_days, n_assets))
197          for i, symbol in enumerate(symbols):
198              if symbol.upper() in ["AAPL", "GOOGL", "MSFT", "NVDA", "META"]:
199                  mu, sigma = params["tech"]
200              elif symbol.upper() in ["JNJ", "PG", "KO", "PEP", "WMT"]:
201                  mu, sigma = params["stable"]
202              else:
203                  mu, sigma = params["default"]
204  
205              returns[:, i] = np.random.normal(mu, sigma, n_days)
206  
207          # Add some correlation
208          correlation_factor = 0.3
209          market_factor = np.random.normal(0.0004, 0.012, n_days)
210          for i in range(n_assets):
211              returns[:, i] = (
212                  correlation_factor * market_factor +
213                  (1 - correlation_factor) * returns[:, i]
214              )
215  
216          return returns
217  
218      def _get_market_returns(self) -> np.ndarray:
219          """Get market returns (S&P 500 proxy)."""
220          np.random.seed(123)
221          return np.random.normal(0.0004, 0.012, 252)
222  
223      def _calculate_var(self, returns: np.ndarray, confidence: float) -> float:
224          """Calculate Value at Risk."""
225          return -np.percentile(returns, (1 - confidence) * 100)
226  
227      def _calculate_cvar(self, returns: np.ndarray, confidence: float) -> float:
228          """Calculate Conditional Value at Risk (Expected Shortfall)."""
229          var = self._calculate_var(returns, confidence)
230          return -np.mean(returns[returns <= -var])
231  
232      def _calculate_max_drawdown(self, returns: np.ndarray) -> float:
233          """Calculate maximum drawdown."""
234          cumulative = np.cumprod(1 + returns)
235          running_max = np.maximum.accumulate(cumulative)
236          drawdowns = (running_max - cumulative) / running_max
237          return np.max(drawdowns)
238  
239      def _calculate_beta(
240          self, portfolio_returns: np.ndarray, market_returns: np.ndarray
241      ) -> float:
242          """Calculate portfolio beta."""
243          covariance = np.cov(portfolio_returns, market_returns)[0, 1]
244          market_variance = np.var(market_returns)
245          return covariance / market_variance if market_variance > 0 else 1.0
246  
247      def _calculate_sortino(self, returns: np.ndarray) -> float:
248          """Calculate Sortino ratio."""
249          mean_return = np.mean(returns) * self.trading_days
250          negative_returns = returns[returns < 0]
251          downside_std = np.std(negative_returns) * np.sqrt(self.trading_days)
252          if downside_std == 0:
253              return 0.0
254          return (mean_return - self.risk_free_rate) / downside_std
255  
256      def _calculate_correlation(
257          self, returns: np.ndarray, symbols: List[str]
258      ) -> Dict[str, Dict[str, float]]:
259          """Calculate correlation matrix."""
260          corr_matrix = np.corrcoef(returns.T)
261          result = {}
262          for i, s1 in enumerate(symbols):
263              result[s1] = {}
264              for j, s2 in enumerate(symbols):
265                  result[s1][s2] = round(corr_matrix[i, j], 3)
266          return result
267  
268      def _calculate_risk_contribution(
269          self, returns: np.ndarray, weights: np.ndarray, symbols: List[str]
270      ) -> Dict[str, float]:
271          """Calculate risk contribution of each asset."""
272          cov_matrix = np.cov(returns.T)
273          portfolio_vol = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights)))
274          marginal_contrib = np.dot(cov_matrix, weights) / portfolio_vol
275          risk_contrib = weights * marginal_contrib
276          risk_contrib_pct = risk_contrib / np.sum(risk_contrib)
277          return {s: round(rc * 100, 2) for s, rc in zip(symbols, risk_contrib_pct)}
278  
279      def _optimize_sharpe(
280          self, expected_returns: np.ndarray, cov_matrix: np.ndarray
281      ) -> np.ndarray:
282          """Find maximum Sharpe ratio portfolio."""
283          n = len(expected_returns)
284  
285          def neg_sharpe(weights):
286              ret = np.dot(expected_returns, weights)
287              vol = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights)))
288              return -(ret - self.risk_free_rate) / vol
289  
290          constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}]
291          bounds = [(0, 1) for _ in range(n)]
292          init_weights = np.ones(n) / n
293  
294          result = minimize(
295              neg_sharpe, init_weights,
296              method="SLSQP", bounds=bounds, constraints=constraints
297          )
298          return result.x
299  
300      def _optimize_min_variance(self, cov_matrix: np.ndarray) -> np.ndarray:
301          """Find minimum variance portfolio."""
302          n = cov_matrix.shape[0]
303  
304          def portfolio_variance(weights):
305              return np.dot(weights, np.dot(cov_matrix, weights))
306  
307          constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}]
308          bounds = [(0, 1) for _ in range(n)]
309          init_weights = np.ones(n) / n
310  
311          result = minimize(
312              portfolio_variance, init_weights,
313              method="SLSQP", bounds=bounds, constraints=constraints
314          )
315          return result.x
316  
317      def _optimize_risk_parity(self, cov_matrix: np.ndarray) -> np.ndarray:
318          """Find risk parity portfolio."""
319          n = cov_matrix.shape[0]
320          target_risk = 1 / n
321  
322          def risk_parity_obj(weights):
323              portfolio_vol = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights)))
324              marginal_contrib = np.dot(cov_matrix, weights) / portfolio_vol
325              risk_contrib = weights * marginal_contrib
326              risk_contrib_pct = risk_contrib / np.sum(risk_contrib)
327              return np.sum((risk_contrib_pct - target_risk) ** 2)
328  
329          constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}]
330          bounds = [(0.01, 1) for _ in range(n)]
331          init_weights = np.ones(n) / n
332  
333          result = minimize(
334              risk_parity_obj, init_weights,
335              method="SLSQP", bounds=bounds, constraints=constraints
336          )
337          return result.x
338  
339      def _generate_recommendation(
340          self,
341          current: np.ndarray,
342          max_sharpe: np.ndarray,
343          min_var: np.ndarray,
344          risk_parity: np.ndarray,
345          symbols: List[str]
346      ) -> Dict[str, Any]:
347          """Generate portfolio rebalancing recommendation."""
348          # Find biggest changes needed
349          changes_sharpe = max_sharpe - current
350          changes_min_var = min_var - current
351  
352          largest_increase = symbols[np.argmax(changes_sharpe)]
353          largest_decrease = symbols[np.argmin(changes_sharpe)]
354  
355          return {
356              "summary": (
357                  f"Consider increasing allocation to {largest_increase} "
358                  f"and reducing {largest_decrease} for better risk-adjusted returns."
359              ),
360              "confidence": "medium",
361              "suggested_rebalancing": {
362                  "increase": {
363                      largest_increase: round(changes_sharpe[np.argmax(changes_sharpe)] * 100, 1)
364                  },
365                  "decrease": {
366                      largest_decrease: round(changes_sharpe[np.argmin(changes_sharpe)] * 100, 1)
367                  }
368              },
369              "notes": [
370                  "Recommendations based on historical data patterns",
371                  "Past performance does not guarantee future results",
372                  "Consider tax implications before rebalancing"
373              ]
374          }