bimodality_tests.py
1 # EXTENDS: DES-c386de6567f0 2 #!/usr/bin/env python3 3 """ 4 Formal Bimodality Testing for D1 Distribution (RDM-S4) 5 ====================================================== 6 7 Tests whether the D1 (state externalization) distribution across 44 models 8 is formally bimodal rather than merely visually bimodal. Uses: 9 1. Hartigan's dip test (frequentist) 10 2. Beta mixture model comparison: 1-component vs 2-component (BIC + Bayes factor) 11 3. Cluster membership probabilities per model 12 4. Figure: D1 histogram with mixture components overlaid 13 14 Usage: 15 .venv/bin/python3 research/scripts/bimodality_tests.py 16 """ 17 18 from __future__ import annotations 19 20 import json 21 import math 22 import sys 23 from pathlib import Path 24 25 import diptest 26 import numpy as np 27 from scipy import stats 28 from scipy.optimize import minimize 29 from scipy.special import betaln, gammaln 30 31 # ---- Paths ---------------------------------------------------------------- 32 33 REPO_ROOT = Path(__file__).resolve().parent.parent.parent 34 N30_SUMMARY = REPO_ROOT / "research" / "results" / "n30_analysis" / "n30_summary.json" 35 OUTPUT_DIR = REPO_ROOT / "research" / "results" / "bimodality" 36 OUTPUT_JSON = OUTPUT_DIR / "bimodality_results.json" 37 OUTPUT_MD = OUTPUT_DIR / "bimodality_report.md" 38 OUTPUT_FIG = OUTPUT_DIR / "d1_bimodality_mixture.pdf" 39 40 41 def load_d1_values() -> tuple[list[float], list[dict]]: 42 """Load per-model D1 scores from N=30 analysis. 43 44 Uses native_api D1 as primary, falls back to text_xml. 45 Returns (d1_values, model_records). 46 """ 47 data = json.loads(N30_SUMMARY.read_text()) 48 models = data["models"] 49 50 d1_values = [] 51 records = [] 52 for m in models: 53 # Use native_api D1 first, then text_xml 54 d1 = m.get("d1_native_api") 55 fmt_used = "native_api" 56 n = m.get("n_native_api", 0) 57 if d1 is None or n < 3: 58 d1 = m.get("d1_text_xml") 59 fmt_used = "text_xml" 60 n = m.get("n_text_xml", 0) 61 if d1 is None or n < 3: 62 continue 63 64 d1_values.append(d1) 65 records.append({ 66 "model_key": m["model_key"], 67 "model_display": m["model_display"], 68 "lab": m["lab"], 69 "d1": d1, 70 "format_used": fmt_used, 71 "n": n, 72 "cluster": m["cluster"], 73 }) 74 75 return d1_values, records 76 77 78 # ---- Hartigan's Dip Test -------------------------------------------------- 79 80 def run_dip_test(values: list[float]) -> dict: 81 """Run Hartigan's dip test for unimodality.""" 82 x = np.array(values) 83 dip_stat, p_value = diptest.diptest(x) 84 return { 85 "dip_statistic": float(dip_stat), 86 "p_value": float(p_value), 87 "n": len(values), 88 "reject_unimodality": p_value < 0.05, 89 "interpretation": ( 90 f"Dip={dip_stat:.4f}, p={p_value:.4f}. " 91 + ("Reject unimodality at alpha=0.05." if p_value < 0.05 92 else "Cannot reject unimodality at alpha=0.05.") 93 ), 94 } 95 96 97 # ---- Beta Mixture Model -------------------------------------------------- 98 99 def beta_log_likelihood(params: np.ndarray, x: np.ndarray, k: int) -> float: 100 """Negative log-likelihood for k-component beta mixture. 101 102 params layout for k components: 103 [weight_1, ..., weight_{k-1}, alpha_1, beta_1, ..., alpha_k, beta_k] 104 """ 105 n = len(x) 106 # Clamp x to avoid log(0) at boundaries 107 x_safe = np.clip(x, 1e-6, 1 - 1e-6) 108 109 # Extract weights 110 if k == 1: 111 weights = np.array([1.0]) 112 ab_start = 0 113 else: 114 raw_weights = params[:k - 1] 115 # Softmax for weights 116 exp_w = np.exp(raw_weights - np.max(raw_weights)) 117 last = 1.0 118 w_list = [] 119 for rw in exp_w: 120 w_list.append(rw) 121 total = sum(w_list) + last 122 weights = np.array(w_list + [last]) / total 123 ab_start = k - 1 124 125 # Extract alpha, beta for each component (use exp to keep positive) 126 total_ll = 0.0 127 for i in range(k): 128 a = np.exp(params[ab_start + 2 * i]) 129 b = np.exp(params[ab_start + 2 * i + 1]) 130 # Clamp to avoid numerical issues 131 a = np.clip(a, 0.01, 100) 132 b = np.clip(b, 0.01, 100) 133 log_pdf = stats.beta.logpdf(x_safe, a, b) 134 if i == 0: 135 ll_matrix = np.log(weights[i] + 1e-300) + log_pdf 136 else: 137 ll_matrix = np.logaddexp(ll_matrix, np.log(weights[i] + 1e-300) + log_pdf) 138 139 return -np.sum(ll_matrix) 140 141 142 def fit_beta_mixture(x: np.ndarray, k: int, n_restarts: int = 10) -> dict: 143 """Fit k-component beta mixture model via MLE.""" 144 best_nll = float("inf") 145 best_params = None 146 rng = np.random.RandomState(42) 147 148 for _ in range(n_restarts): 149 if k == 1: 150 # Single beta: just alpha, beta 151 init = rng.randn(2) * 0.5 152 else: 153 # k-1 weight params + 2*k shape params 154 init = rng.randn(k - 1 + 2 * k) * 0.5 155 156 try: 157 res = minimize( 158 beta_log_likelihood, init, args=(x, k), 159 method="Nelder-Mead", 160 options={"maxiter": 5000, "xatol": 1e-8, "fatol": 1e-8}, 161 ) 162 if res.fun < best_nll: 163 best_nll = res.fun 164 best_params = res.x 165 except Exception: 166 continue 167 168 if best_params is None: 169 return {"nll": float("inf"), "bic": float("inf"), "aic": float("inf")} 170 171 n = len(x) 172 n_params = len(best_params) 173 bic = 2 * best_nll + n_params * np.log(n) 174 aic = 2 * best_nll + 2 * n_params 175 176 # Extract readable parameters 177 if k == 1: 178 weights = [1.0] 179 alphas = [float(np.exp(np.clip(best_params[0], -10, 10)))] 180 betas = [float(np.exp(np.clip(best_params[1], -10, 10)))] 181 else: 182 raw_w = best_params[:k - 1] 183 exp_w = np.exp(raw_w - np.max(raw_w)) 184 all_w = list(exp_w) + [1.0] 185 total = sum(all_w) 186 weights = [w / total for w in all_w] 187 alphas = [] 188 betas_list = [] 189 ab_start = k - 1 190 for i in range(k): 191 alphas.append(float(np.exp(np.clip(best_params[ab_start + 2 * i], -10, 10)))) 192 betas_list.append(float(np.exp(np.clip(best_params[ab_start + 2 * i + 1], -10, 10)))) 193 betas = betas_list 194 195 return { 196 "nll": float(best_nll), 197 "bic": float(bic), 198 "aic": float(aic), 199 "n_params": n_params, 200 "weights": weights, 201 "alphas": alphas, 202 "betas": betas, 203 "k": k, 204 } 205 206 207 def compute_membership_probabilities( 208 x: np.ndarray, mixture: dict 209 ) -> list[list[float]]: 210 """Compute posterior membership probabilities for each data point.""" 211 k = mixture["k"] 212 weights = mixture["weights"] 213 alphas = mixture["alphas"] 214 betas = mixture["betas"] 215 216 x_safe = np.clip(x, 1e-6, 1 - 1e-6) 217 n = len(x_safe) 218 probs = np.zeros((n, k)) 219 220 for j in range(k): 221 probs[:, j] = weights[j] * stats.beta.pdf(x_safe, alphas[j], betas[j]) 222 223 # Normalize 224 row_sums = probs.sum(axis=1, keepdims=True) 225 row_sums = np.maximum(row_sums, 1e-300) 226 probs = probs / row_sums 227 228 return probs.tolist() 229 230 231 # ---- Figure generation ---------------------------------------------------- 232 233 def generate_figure( 234 d1_values: list[float], 235 records: list[dict], 236 mixture_1: dict, 237 mixture_2: dict, 238 output_path: Path, 239 ) -> None: 240 """Generate D1 histogram with mixture model overlay.""" 241 try: 242 import matplotlib 243 matplotlib.use("Agg") 244 import matplotlib.pyplot as plt 245 except ImportError: 246 print(" matplotlib not available, skipping figure") 247 return 248 249 fig, axes = plt.subplots(1, 2, figsize=(12, 5)) 250 x = np.array(d1_values) 251 x_grid = np.linspace(0.001, 0.999, 500) 252 253 # Left panel: histogram with 2-component mixture 254 ax = axes[0] 255 ax.hist(x, bins=20, density=True, alpha=0.5, color="#4C72B0", edgecolor="white", 256 label="Observed D1") 257 258 # Plot individual components 259 colors = ["#DD8452", "#55A868"] 260 for j in range(mixture_2["k"]): 261 w = mixture_2["weights"][j] 262 a = mixture_2["alphas"][j] 263 b = mixture_2["betas"][j] 264 component_pdf = w * stats.beta.pdf(x_grid, a, b) 265 mean_val = a / (a + b) 266 ax.plot(x_grid, component_pdf, "--", color=colors[j], linewidth=1.5, 267 label=f"Component {j+1} (w={w:.2f}, mean={mean_val:.2f})") 268 269 # Plot total mixture 270 total_pdf = np.zeros_like(x_grid) 271 for j in range(mixture_2["k"]): 272 total_pdf += mixture_2["weights"][j] * stats.beta.pdf( 273 x_grid, mixture_2["alphas"][j], mixture_2["betas"][j] 274 ) 275 ax.plot(x_grid, total_pdf, "-", color="black", linewidth=2, label="2-component mixture") 276 277 ax.set_xlabel("D1 Score (State Externalization)", fontsize=11) 278 ax.set_ylabel("Density", fontsize=11) 279 ax.set_title("D1 Distribution with Beta Mixture Model", fontsize=12) 280 ax.legend(fontsize=8, loc="upper center") 281 ax.set_xlim(-0.05, 1.05) 282 283 # Right panel: 1-component vs 2-component comparison 284 ax2 = axes[1] 285 ax2.hist(x, bins=20, density=True, alpha=0.3, color="#4C72B0", edgecolor="white") 286 287 # 1-component 288 pdf_1 = stats.beta.pdf(x_grid, mixture_1["alphas"][0], mixture_1["betas"][0]) 289 ax2.plot(x_grid, pdf_1, "-", color="#C44E52", linewidth=2, 290 label=f"1-component (BIC={mixture_1['bic']:.1f})") 291 292 # 2-component total 293 ax2.plot(x_grid, total_pdf, "-", color="#4C72B0", linewidth=2, 294 label=f"2-component (BIC={mixture_2['bic']:.1f})") 295 296 delta_bic = mixture_1["bic"] - mixture_2["bic"] 297 ax2.set_title(f"Model Comparison (ΔBIC={delta_bic:.1f})", fontsize=12) 298 ax2.set_xlabel("D1 Score", fontsize=11) 299 ax2.set_ylabel("Density", fontsize=11) 300 ax2.legend(fontsize=9) 301 ax2.set_xlim(-0.05, 1.05) 302 303 plt.tight_layout() 304 plt.savefig(output_path, dpi=150, bbox_inches="tight") 305 plt.close() 306 print(f" Figure saved: {output_path}") 307 308 309 # ---- Report generation --------------------------------------------------- 310 311 def generate_report( 312 d1_values: list[float], 313 records: list[dict], 314 dip_result: dict, 315 mixture_1: dict, 316 mixture_2: dict, 317 membership: list[list[float]], 318 ) -> str: 319 """Generate markdown report.""" 320 delta_bic = mixture_1["bic"] - mixture_2["bic"] 321 delta_aic = mixture_1["aic"] - mixture_2["aic"] 322 323 # Approximate Bayes factor from BIC difference 324 log_bf = delta_bic / 2 325 bf = np.exp(min(log_bf, 500)) # clamp to avoid overflow 326 327 lines = [ 328 "# Formal Bimodality Testing for D1 Distribution (RDM-S4)", 329 "", 330 f"*N={len(d1_values)} models, generated from N=30 campaign data*", 331 "", 332 "## 1. Hartigan's Dip Test", 333 "", 334 f"- **Dip statistic:** {dip_result['dip_statistic']:.4f}", 335 f"- **p-value:** {dip_result['p_value']:.4f}", 336 f"- **Conclusion:** {'Reject unimodality' if dip_result['reject_unimodality'] else 'Cannot reject unimodality'} at alpha=0.05", 337 "", 338 "The dip test measures the maximum difference between the empirical CDF", 339 "and the closest unimodal CDF. Small p-values indicate multimodality.", 340 "", 341 "## 2. Beta Mixture Model Comparison", 342 "", 343 "### 1-Component Beta", 344 "", 345 f"- alpha={mixture_1['alphas'][0]:.3f}, beta={mixture_1['betas'][0]:.3f}", 346 f"- Mean={mixture_1['alphas'][0]/(mixture_1['alphas'][0]+mixture_1['betas'][0]):.3f}", 347 f"- NLL={mixture_1['nll']:.2f}, BIC={mixture_1['bic']:.2f}, AIC={mixture_1['aic']:.2f}", 348 "", 349 "### 2-Component Beta Mixture", 350 "", 351 ] 352 353 for j in range(mixture_2["k"]): 354 w = mixture_2["weights"][j] 355 a = mixture_2["alphas"][j] 356 b = mixture_2["betas"][j] 357 mu = a / (a + b) 358 lines.append(f"- **Component {j+1}:** weight={w:.3f}, alpha={a:.3f}, beta={b:.3f}, mean={mu:.3f}") 359 360 lines.extend([ 361 f"- NLL={mixture_2['nll']:.2f}, BIC={mixture_2['bic']:.2f}, AIC={mixture_2['aic']:.2f}", 362 "", 363 "### Model Selection", 364 "", 365 f"- **ΔBIC** (1-component minus 2-component): {delta_bic:.2f}", 366 f"- **ΔAIC**: {delta_aic:.2f}", 367 f"- **Approximate Bayes Factor** (2-component over 1-component): {bf:.1f}", 368 "", 369 ]) 370 371 if delta_bic > 10: 372 bic_interp = "Very strong evidence for bimodality (ΔBIC > 10)" 373 elif delta_bic > 6: 374 bic_interp = "Strong evidence for bimodality (ΔBIC > 6)" 375 elif delta_bic > 2: 376 bic_interp = "Positive evidence for bimodality (ΔBIC > 2)" 377 elif delta_bic > 0: 378 bic_interp = "Weak evidence for bimodality (ΔBIC > 0)" 379 else: 380 bic_interp = "Evidence favors unimodality (ΔBIC < 0)" 381 382 lines.append(f"**Interpretation:** {bic_interp}") 383 384 if bf > 100: 385 bf_interp = "Decisive evidence for bimodality (BF > 100)" 386 elif bf > 30: 387 bf_interp = "Very strong evidence (BF > 30)" 388 elif bf > 10: 389 bf_interp = "Strong evidence (BF > 10)" 390 elif bf > 3: 391 bf_interp = "Moderate evidence (BF > 3)" 392 else: 393 bf_interp = f"Weak evidence (BF = {bf:.1f})" 394 395 lines.append(f"**Bayes Factor interpretation:** {bf_interp}") 396 397 # Membership probabilities table 398 lines.extend([ 399 "", 400 "## 3. Cluster Membership Probabilities", 401 "", 402 "Per-model posterior probability of belonging to each component.", 403 "", 404 "| Model | D1 | Cluster | P(Low-D1) | P(High-D1) | Assignment |", 405 "|-------|-----|---------|-----------|------------|------------|", 406 ]) 407 408 for i, rec in enumerate(records): 409 p_low = membership[i][0] 410 p_high = membership[i][1] if len(membership[i]) > 1 else 1 - p_low 411 assignment = "Low-D1" if p_low > p_high else "High-D1" 412 confidence = max(p_low, p_high) 413 conf_str = "certain" if confidence > 0.95 else "strong" if confidence > 0.8 else "weak" 414 lines.append( 415 f"| {rec['model_display']} | {rec['d1']:.3f} | {rec['cluster']} " 416 f"| {p_low:.3f} | {p_high:.3f} | {assignment} ({conf_str}) |" 417 ) 418 419 # Boundary models (uncertain membership) 420 boundary = [(i, records[i], membership[i]) for i in range(len(records)) 421 if len(membership[i]) > 1 and max(membership[i]) < 0.8] 422 if boundary: 423 lines.extend([ 424 "", 425 "## 4. Boundary Models (membership < 0.8)", 426 "", 427 "These models have uncertain cluster membership, sitting near the", 428 "bimodal boundary. They are the most informative for understanding", 429 "the externalization transition.", 430 "", 431 ]) 432 for i, rec, mem in sorted(boundary, key=lambda x: x[1]["d1"]): 433 lines.append(f"- **{rec['model_display']}** (D1={rec['d1']:.3f}): " 434 f"P(Low)={mem[0]:.3f}, P(High)={mem[1]:.3f}") 435 436 # Summary statistics 437 low_d1 = [rec["d1"] for i, rec in enumerate(records) 438 if len(membership[i]) > 1 and membership[i][0] > 0.5] 439 high_d1 = [rec["d1"] for i, rec in enumerate(records) 440 if len(membership[i]) > 1 and membership[i][1] > 0.5] 441 442 lines.extend([ 443 "", 444 "## 5. Summary Statistics by Cluster", 445 "", 446 f"- **Low-D1 cluster:** {len(low_d1)} models, mean D1={np.mean(low_d1):.3f} " 447 f"(SD={np.std(low_d1):.3f})" if low_d1 else "- **Low-D1 cluster:** empty", 448 f"- **High-D1 cluster:** {len(high_d1)} models, mean D1={np.mean(high_d1):.3f} " 449 f"(SD={np.std(high_d1):.3f})" if high_d1 else "- **High-D1 cluster:** empty", 450 "", 451 "## 6. Paper-Ready Summary", 452 "", 453 f"Hartigan's dip test ({'rejects' if dip_result['reject_unimodality'] else 'does not reject'} " 454 f"unimodality, D={dip_result['dip_statistic']:.4f}, p={dip_result['p_value']:.4f}, " 455 f"N={len(d1_values)}). " 456 f"Beta mixture model comparison {'strongly favors' if delta_bic > 6 else 'favors' if delta_bic > 2 else 'weakly favors' if delta_bic > 0 else 'does not favor'} " 457 f"a two-component model (ΔBIC={delta_bic:.1f}, approximate BF={bf:.1f}).", 458 "", 459 "---", 460 "*Script: research/scripts/bimodality_tests.py*", 461 ]) 462 463 return "\n".join(lines) 464 465 466 # ---- Main ----------------------------------------------------------------- 467 468 def main(): 469 print("Loading D1 values from N=30 analysis...") 470 d1_values, records = load_d1_values() 471 print(f" {len(d1_values)} models with valid D1 scores") 472 473 x = np.array(d1_values) 474 475 print("\n1. Running Hartigan's dip test...") 476 dip_result = run_dip_test(d1_values) 477 print(f" {dip_result['interpretation']}") 478 479 print("\n2. Fitting beta mixture models...") 480 print(" Fitting 1-component beta...") 481 mixture_1 = fit_beta_mixture(x, k=1) 482 print(f" 1-component: NLL={mixture_1['nll']:.2f}, BIC={mixture_1['bic']:.2f}") 483 484 print(" Fitting 2-component beta mixture...") 485 mixture_2 = fit_beta_mixture(x, k=2, n_restarts=20) 486 print(f" 2-component: NLL={mixture_2['nll']:.2f}, BIC={mixture_2['bic']:.2f}") 487 488 delta_bic = mixture_1["bic"] - mixture_2["bic"] 489 print(f" ΔBIC = {delta_bic:.2f} ({'favors 2-component' if delta_bic > 0 else 'favors 1-component'})") 490 491 print("\n3. Computing membership probabilities...") 492 membership = compute_membership_probabilities(x, mixture_2) 493 494 # Ensure output directory 495 OUTPUT_DIR.mkdir(parents=True, exist_ok=True) 496 497 print("\n4. Generating figure...") 498 generate_figure(d1_values, records, mixture_1, mixture_2, OUTPUT_FIG) 499 500 print("\n5. Writing report...") 501 report = generate_report(d1_values, records, dip_result, mixture_1, mixture_2, membership) 502 OUTPUT_MD.write_text(report) 503 print(f" Report: {OUTPUT_MD}") 504 505 # Write JSON results 506 results = { 507 "n_models": len(d1_values), 508 "d1_values": d1_values, 509 "dip_test": dip_result, 510 "mixture_1_component": {k: v for k, v in mixture_1.items() if k != "raw_params"}, 511 "mixture_2_component": {k: v for k, v in mixture_2.items() if k != "raw_params"}, 512 "delta_bic": delta_bic, 513 "delta_aic": mixture_1["aic"] - mixture_2["aic"], 514 "approximate_bayes_factor": float(np.exp(min(delta_bic / 2, 500))), 515 "membership_probabilities": [ 516 { 517 "model_key": records[i]["model_key"], 518 "model_display": records[i]["model_display"], 519 "d1": records[i]["d1"], 520 "p_components": membership[i], 521 } 522 for i in range(len(records)) 523 ], 524 } 525 OUTPUT_JSON.write_text(json.dumps(results, indent=2)) 526 print(f" JSON: {OUTPUT_JSON}") 527 528 print(f"\n{'='*60}") 529 print("BIMODALITY TESTING COMPLETE") 530 print(f"{'='*60}") 531 532 533 if __name__ == "__main__": 534 main()