/ analysis / bimodality_tests.py
bimodality_tests.py
  1  # EXTENDS: DES-c386de6567f0
  2  #!/usr/bin/env python3
  3  """
  4  Formal Bimodality Testing for D1 Distribution (RDM-S4)
  5  ======================================================
  6  
  7  Tests whether the D1 (state externalization) distribution across 44 models
  8  is formally bimodal rather than merely visually bimodal. Uses:
  9  1. Hartigan's dip test (frequentist)
 10  2. Beta mixture model comparison: 1-component vs 2-component (BIC + Bayes factor)
 11  3. Cluster membership probabilities per model
 12  4. Figure: D1 histogram with mixture components overlaid
 13  
 14  Usage:
 15    .venv/bin/python3 research/scripts/bimodality_tests.py
 16  """
 17  
 18  from __future__ import annotations
 19  
 20  import json
 21  import math
 22  import sys
 23  from pathlib import Path
 24  
 25  import diptest
 26  import numpy as np
 27  from scipy import stats
 28  from scipy.optimize import minimize
 29  from scipy.special import betaln, gammaln
 30  
 31  # ---- Paths ----------------------------------------------------------------
 32  
 33  REPO_ROOT = Path(__file__).resolve().parent.parent.parent
 34  N30_SUMMARY = REPO_ROOT / "research" / "results" / "n30_analysis" / "n30_summary.json"
 35  OUTPUT_DIR = REPO_ROOT / "research" / "results" / "bimodality"
 36  OUTPUT_JSON = OUTPUT_DIR / "bimodality_results.json"
 37  OUTPUT_MD = OUTPUT_DIR / "bimodality_report.md"
 38  OUTPUT_FIG = OUTPUT_DIR / "d1_bimodality_mixture.pdf"
 39  
 40  
 41  def load_d1_values() -> tuple[list[float], list[dict]]:
 42      """Load per-model D1 scores from N=30 analysis.
 43  
 44      Uses native_api D1 as primary, falls back to text_xml.
 45      Returns (d1_values, model_records).
 46      """
 47      data = json.loads(N30_SUMMARY.read_text())
 48      models = data["models"]
 49  
 50      d1_values = []
 51      records = []
 52      for m in models:
 53          # Use native_api D1 first, then text_xml
 54          d1 = m.get("d1_native_api")
 55          fmt_used = "native_api"
 56          n = m.get("n_native_api", 0)
 57          if d1 is None or n < 3:
 58              d1 = m.get("d1_text_xml")
 59              fmt_used = "text_xml"
 60              n = m.get("n_text_xml", 0)
 61          if d1 is None or n < 3:
 62              continue
 63  
 64          d1_values.append(d1)
 65          records.append({
 66              "model_key": m["model_key"],
 67              "model_display": m["model_display"],
 68              "lab": m["lab"],
 69              "d1": d1,
 70              "format_used": fmt_used,
 71              "n": n,
 72              "cluster": m["cluster"],
 73          })
 74  
 75      return d1_values, records
 76  
 77  
 78  # ---- Hartigan's Dip Test --------------------------------------------------
 79  
 80  def run_dip_test(values: list[float]) -> dict:
 81      """Run Hartigan's dip test for unimodality."""
 82      x = np.array(values)
 83      dip_stat, p_value = diptest.diptest(x)
 84      return {
 85          "dip_statistic": float(dip_stat),
 86          "p_value": float(p_value),
 87          "n": len(values),
 88          "reject_unimodality": p_value < 0.05,
 89          "interpretation": (
 90              f"Dip={dip_stat:.4f}, p={p_value:.4f}. "
 91              + ("Reject unimodality at alpha=0.05." if p_value < 0.05
 92                 else "Cannot reject unimodality at alpha=0.05.")
 93          ),
 94      }
 95  
 96  
 97  # ---- Beta Mixture Model --------------------------------------------------
 98  
 99  def beta_log_likelihood(params: np.ndarray, x: np.ndarray, k: int) -> float:
100      """Negative log-likelihood for k-component beta mixture.
101  
102      params layout for k components:
103        [weight_1, ..., weight_{k-1}, alpha_1, beta_1, ..., alpha_k, beta_k]
104      """
105      n = len(x)
106      # Clamp x to avoid log(0) at boundaries
107      x_safe = np.clip(x, 1e-6, 1 - 1e-6)
108  
109      # Extract weights
110      if k == 1:
111          weights = np.array([1.0])
112          ab_start = 0
113      else:
114          raw_weights = params[:k - 1]
115          # Softmax for weights
116          exp_w = np.exp(raw_weights - np.max(raw_weights))
117          last = 1.0
118          w_list = []
119          for rw in exp_w:
120              w_list.append(rw)
121          total = sum(w_list) + last
122          weights = np.array(w_list + [last]) / total
123          ab_start = k - 1
124  
125      # Extract alpha, beta for each component (use exp to keep positive)
126      total_ll = 0.0
127      for i in range(k):
128          a = np.exp(params[ab_start + 2 * i])
129          b = np.exp(params[ab_start + 2 * i + 1])
130          # Clamp to avoid numerical issues
131          a = np.clip(a, 0.01, 100)
132          b = np.clip(b, 0.01, 100)
133          log_pdf = stats.beta.logpdf(x_safe, a, b)
134          if i == 0:
135              ll_matrix = np.log(weights[i] + 1e-300) + log_pdf
136          else:
137              ll_matrix = np.logaddexp(ll_matrix, np.log(weights[i] + 1e-300) + log_pdf)
138  
139      return -np.sum(ll_matrix)
140  
141  
142  def fit_beta_mixture(x: np.ndarray, k: int, n_restarts: int = 10) -> dict:
143      """Fit k-component beta mixture model via MLE."""
144      best_nll = float("inf")
145      best_params = None
146      rng = np.random.RandomState(42)
147  
148      for _ in range(n_restarts):
149          if k == 1:
150              # Single beta: just alpha, beta
151              init = rng.randn(2) * 0.5
152          else:
153              # k-1 weight params + 2*k shape params
154              init = rng.randn(k - 1 + 2 * k) * 0.5
155  
156          try:
157              res = minimize(
158                  beta_log_likelihood, init, args=(x, k),
159                  method="Nelder-Mead",
160                  options={"maxiter": 5000, "xatol": 1e-8, "fatol": 1e-8},
161              )
162              if res.fun < best_nll:
163                  best_nll = res.fun
164                  best_params = res.x
165          except Exception:
166              continue
167  
168      if best_params is None:
169          return {"nll": float("inf"), "bic": float("inf"), "aic": float("inf")}
170  
171      n = len(x)
172      n_params = len(best_params)
173      bic = 2 * best_nll + n_params * np.log(n)
174      aic = 2 * best_nll + 2 * n_params
175  
176      # Extract readable parameters
177      if k == 1:
178          weights = [1.0]
179          alphas = [float(np.exp(np.clip(best_params[0], -10, 10)))]
180          betas = [float(np.exp(np.clip(best_params[1], -10, 10)))]
181      else:
182          raw_w = best_params[:k - 1]
183          exp_w = np.exp(raw_w - np.max(raw_w))
184          all_w = list(exp_w) + [1.0]
185          total = sum(all_w)
186          weights = [w / total for w in all_w]
187          alphas = []
188          betas_list = []
189          ab_start = k - 1
190          for i in range(k):
191              alphas.append(float(np.exp(np.clip(best_params[ab_start + 2 * i], -10, 10))))
192              betas_list.append(float(np.exp(np.clip(best_params[ab_start + 2 * i + 1], -10, 10))))
193          betas = betas_list
194  
195      return {
196          "nll": float(best_nll),
197          "bic": float(bic),
198          "aic": float(aic),
199          "n_params": n_params,
200          "weights": weights,
201          "alphas": alphas,
202          "betas": betas,
203          "k": k,
204      }
205  
206  
207  def compute_membership_probabilities(
208      x: np.ndarray, mixture: dict
209  ) -> list[list[float]]:
210      """Compute posterior membership probabilities for each data point."""
211      k = mixture["k"]
212      weights = mixture["weights"]
213      alphas = mixture["alphas"]
214      betas = mixture["betas"]
215  
216      x_safe = np.clip(x, 1e-6, 1 - 1e-6)
217      n = len(x_safe)
218      probs = np.zeros((n, k))
219  
220      for j in range(k):
221          probs[:, j] = weights[j] * stats.beta.pdf(x_safe, alphas[j], betas[j])
222  
223      # Normalize
224      row_sums = probs.sum(axis=1, keepdims=True)
225      row_sums = np.maximum(row_sums, 1e-300)
226      probs = probs / row_sums
227  
228      return probs.tolist()
229  
230  
231  # ---- Figure generation ----------------------------------------------------
232  
233  def generate_figure(
234      d1_values: list[float],
235      records: list[dict],
236      mixture_1: dict,
237      mixture_2: dict,
238      output_path: Path,
239  ) -> None:
240      """Generate D1 histogram with mixture model overlay."""
241      try:
242          import matplotlib
243          matplotlib.use("Agg")
244          import matplotlib.pyplot as plt
245      except ImportError:
246          print("  matplotlib not available, skipping figure")
247          return
248  
249      fig, axes = plt.subplots(1, 2, figsize=(12, 5))
250      x = np.array(d1_values)
251      x_grid = np.linspace(0.001, 0.999, 500)
252  
253      # Left panel: histogram with 2-component mixture
254      ax = axes[0]
255      ax.hist(x, bins=20, density=True, alpha=0.5, color="#4C72B0", edgecolor="white",
256              label="Observed D1")
257  
258      # Plot individual components
259      colors = ["#DD8452", "#55A868"]
260      for j in range(mixture_2["k"]):
261          w = mixture_2["weights"][j]
262          a = mixture_2["alphas"][j]
263          b = mixture_2["betas"][j]
264          component_pdf = w * stats.beta.pdf(x_grid, a, b)
265          mean_val = a / (a + b)
266          ax.plot(x_grid, component_pdf, "--", color=colors[j], linewidth=1.5,
267                  label=f"Component {j+1} (w={w:.2f}, mean={mean_val:.2f})")
268  
269      # Plot total mixture
270      total_pdf = np.zeros_like(x_grid)
271      for j in range(mixture_2["k"]):
272          total_pdf += mixture_2["weights"][j] * stats.beta.pdf(
273              x_grid, mixture_2["alphas"][j], mixture_2["betas"][j]
274          )
275      ax.plot(x_grid, total_pdf, "-", color="black", linewidth=2, label="2-component mixture")
276  
277      ax.set_xlabel("D1 Score (State Externalization)", fontsize=11)
278      ax.set_ylabel("Density", fontsize=11)
279      ax.set_title("D1 Distribution with Beta Mixture Model", fontsize=12)
280      ax.legend(fontsize=8, loc="upper center")
281      ax.set_xlim(-0.05, 1.05)
282  
283      # Right panel: 1-component vs 2-component comparison
284      ax2 = axes[1]
285      ax2.hist(x, bins=20, density=True, alpha=0.3, color="#4C72B0", edgecolor="white")
286  
287      # 1-component
288      pdf_1 = stats.beta.pdf(x_grid, mixture_1["alphas"][0], mixture_1["betas"][0])
289      ax2.plot(x_grid, pdf_1, "-", color="#C44E52", linewidth=2,
290               label=f"1-component (BIC={mixture_1['bic']:.1f})")
291  
292      # 2-component total
293      ax2.plot(x_grid, total_pdf, "-", color="#4C72B0", linewidth=2,
294               label=f"2-component (BIC={mixture_2['bic']:.1f})")
295  
296      delta_bic = mixture_1["bic"] - mixture_2["bic"]
297      ax2.set_title(f"Model Comparison (ΔBIC={delta_bic:.1f})", fontsize=12)
298      ax2.set_xlabel("D1 Score", fontsize=11)
299      ax2.set_ylabel("Density", fontsize=11)
300      ax2.legend(fontsize=9)
301      ax2.set_xlim(-0.05, 1.05)
302  
303      plt.tight_layout()
304      plt.savefig(output_path, dpi=150, bbox_inches="tight")
305      plt.close()
306      print(f"  Figure saved: {output_path}")
307  
308  
309  # ---- Report generation ---------------------------------------------------
310  
311  def generate_report(
312      d1_values: list[float],
313      records: list[dict],
314      dip_result: dict,
315      mixture_1: dict,
316      mixture_2: dict,
317      membership: list[list[float]],
318  ) -> str:
319      """Generate markdown report."""
320      delta_bic = mixture_1["bic"] - mixture_2["bic"]
321      delta_aic = mixture_1["aic"] - mixture_2["aic"]
322  
323      # Approximate Bayes factor from BIC difference
324      log_bf = delta_bic / 2
325      bf = np.exp(min(log_bf, 500))  # clamp to avoid overflow
326  
327      lines = [
328          "# Formal Bimodality Testing for D1 Distribution (RDM-S4)",
329          "",
330          f"*N={len(d1_values)} models, generated from N=30 campaign data*",
331          "",
332          "## 1. Hartigan's Dip Test",
333          "",
334          f"- **Dip statistic:** {dip_result['dip_statistic']:.4f}",
335          f"- **p-value:** {dip_result['p_value']:.4f}",
336          f"- **Conclusion:** {'Reject unimodality' if dip_result['reject_unimodality'] else 'Cannot reject unimodality'} at alpha=0.05",
337          "",
338          "The dip test measures the maximum difference between the empirical CDF",
339          "and the closest unimodal CDF. Small p-values indicate multimodality.",
340          "",
341          "## 2. Beta Mixture Model Comparison",
342          "",
343          "### 1-Component Beta",
344          "",
345          f"- alpha={mixture_1['alphas'][0]:.3f}, beta={mixture_1['betas'][0]:.3f}",
346          f"- Mean={mixture_1['alphas'][0]/(mixture_1['alphas'][0]+mixture_1['betas'][0]):.3f}",
347          f"- NLL={mixture_1['nll']:.2f}, BIC={mixture_1['bic']:.2f}, AIC={mixture_1['aic']:.2f}",
348          "",
349          "### 2-Component Beta Mixture",
350          "",
351      ]
352  
353      for j in range(mixture_2["k"]):
354          w = mixture_2["weights"][j]
355          a = mixture_2["alphas"][j]
356          b = mixture_2["betas"][j]
357          mu = a / (a + b)
358          lines.append(f"- **Component {j+1}:** weight={w:.3f}, alpha={a:.3f}, beta={b:.3f}, mean={mu:.3f}")
359  
360      lines.extend([
361          f"- NLL={mixture_2['nll']:.2f}, BIC={mixture_2['bic']:.2f}, AIC={mixture_2['aic']:.2f}",
362          "",
363          "### Model Selection",
364          "",
365          f"- **ΔBIC** (1-component minus 2-component): {delta_bic:.2f}",
366          f"- **ΔAIC**: {delta_aic:.2f}",
367          f"- **Approximate Bayes Factor** (2-component over 1-component): {bf:.1f}",
368          "",
369      ])
370  
371      if delta_bic > 10:
372          bic_interp = "Very strong evidence for bimodality (ΔBIC > 10)"
373      elif delta_bic > 6:
374          bic_interp = "Strong evidence for bimodality (ΔBIC > 6)"
375      elif delta_bic > 2:
376          bic_interp = "Positive evidence for bimodality (ΔBIC > 2)"
377      elif delta_bic > 0:
378          bic_interp = "Weak evidence for bimodality (ΔBIC > 0)"
379      else:
380          bic_interp = "Evidence favors unimodality (ΔBIC < 0)"
381  
382      lines.append(f"**Interpretation:** {bic_interp}")
383  
384      if bf > 100:
385          bf_interp = "Decisive evidence for bimodality (BF > 100)"
386      elif bf > 30:
387          bf_interp = "Very strong evidence (BF > 30)"
388      elif bf > 10:
389          bf_interp = "Strong evidence (BF > 10)"
390      elif bf > 3:
391          bf_interp = "Moderate evidence (BF > 3)"
392      else:
393          bf_interp = f"Weak evidence (BF = {bf:.1f})"
394  
395      lines.append(f"**Bayes Factor interpretation:** {bf_interp}")
396  
397      # Membership probabilities table
398      lines.extend([
399          "",
400          "## 3. Cluster Membership Probabilities",
401          "",
402          "Per-model posterior probability of belonging to each component.",
403          "",
404          "| Model | D1 | Cluster | P(Low-D1) | P(High-D1) | Assignment |",
405          "|-------|-----|---------|-----------|------------|------------|",
406      ])
407  
408      for i, rec in enumerate(records):
409          p_low = membership[i][0]
410          p_high = membership[i][1] if len(membership[i]) > 1 else 1 - p_low
411          assignment = "Low-D1" if p_low > p_high else "High-D1"
412          confidence = max(p_low, p_high)
413          conf_str = "certain" if confidence > 0.95 else "strong" if confidence > 0.8 else "weak"
414          lines.append(
415              f"| {rec['model_display']} | {rec['d1']:.3f} | {rec['cluster']} "
416              f"| {p_low:.3f} | {p_high:.3f} | {assignment} ({conf_str}) |"
417          )
418  
419      # Boundary models (uncertain membership)
420      boundary = [(i, records[i], membership[i]) for i in range(len(records))
421                  if len(membership[i]) > 1 and max(membership[i]) < 0.8]
422      if boundary:
423          lines.extend([
424              "",
425              "## 4. Boundary Models (membership < 0.8)",
426              "",
427              "These models have uncertain cluster membership, sitting near the",
428              "bimodal boundary. They are the most informative for understanding",
429              "the externalization transition.",
430              "",
431          ])
432          for i, rec, mem in sorted(boundary, key=lambda x: x[1]["d1"]):
433              lines.append(f"- **{rec['model_display']}** (D1={rec['d1']:.3f}): "
434                           f"P(Low)={mem[0]:.3f}, P(High)={mem[1]:.3f}")
435  
436      # Summary statistics
437      low_d1 = [rec["d1"] for i, rec in enumerate(records)
438                if len(membership[i]) > 1 and membership[i][0] > 0.5]
439      high_d1 = [rec["d1"] for i, rec in enumerate(records)
440                 if len(membership[i]) > 1 and membership[i][1] > 0.5]
441  
442      lines.extend([
443          "",
444          "## 5. Summary Statistics by Cluster",
445          "",
446          f"- **Low-D1 cluster:** {len(low_d1)} models, mean D1={np.mean(low_d1):.3f} "
447          f"(SD={np.std(low_d1):.3f})" if low_d1 else "- **Low-D1 cluster:** empty",
448          f"- **High-D1 cluster:** {len(high_d1)} models, mean D1={np.mean(high_d1):.3f} "
449          f"(SD={np.std(high_d1):.3f})" if high_d1 else "- **High-D1 cluster:** empty",
450          "",
451          "## 6. Paper-Ready Summary",
452          "",
453          f"Hartigan's dip test ({'rejects' if dip_result['reject_unimodality'] else 'does not reject'} "
454          f"unimodality, D={dip_result['dip_statistic']:.4f}, p={dip_result['p_value']:.4f}, "
455          f"N={len(d1_values)}). "
456          f"Beta mixture model comparison {'strongly favors' if delta_bic > 6 else 'favors' if delta_bic > 2 else 'weakly favors' if delta_bic > 0 else 'does not favor'} "
457          f"a two-component model (ΔBIC={delta_bic:.1f}, approximate BF={bf:.1f}).",
458          "",
459          "---",
460          "*Script: research/scripts/bimodality_tests.py*",
461      ])
462  
463      return "\n".join(lines)
464  
465  
466  # ---- Main -----------------------------------------------------------------
467  
468  def main():
469      print("Loading D1 values from N=30 analysis...")
470      d1_values, records = load_d1_values()
471      print(f"  {len(d1_values)} models with valid D1 scores")
472  
473      x = np.array(d1_values)
474  
475      print("\n1. Running Hartigan's dip test...")
476      dip_result = run_dip_test(d1_values)
477      print(f"  {dip_result['interpretation']}")
478  
479      print("\n2. Fitting beta mixture models...")
480      print("  Fitting 1-component beta...")
481      mixture_1 = fit_beta_mixture(x, k=1)
482      print(f"  1-component: NLL={mixture_1['nll']:.2f}, BIC={mixture_1['bic']:.2f}")
483  
484      print("  Fitting 2-component beta mixture...")
485      mixture_2 = fit_beta_mixture(x, k=2, n_restarts=20)
486      print(f"  2-component: NLL={mixture_2['nll']:.2f}, BIC={mixture_2['bic']:.2f}")
487  
488      delta_bic = mixture_1["bic"] - mixture_2["bic"]
489      print(f"  ΔBIC = {delta_bic:.2f} ({'favors 2-component' if delta_bic > 0 else 'favors 1-component'})")
490  
491      print("\n3. Computing membership probabilities...")
492      membership = compute_membership_probabilities(x, mixture_2)
493  
494      # Ensure output directory
495      OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
496  
497      print("\n4. Generating figure...")
498      generate_figure(d1_values, records, mixture_1, mixture_2, OUTPUT_FIG)
499  
500      print("\n5. Writing report...")
501      report = generate_report(d1_values, records, dip_result, mixture_1, mixture_2, membership)
502      OUTPUT_MD.write_text(report)
503      print(f"  Report: {OUTPUT_MD}")
504  
505      # Write JSON results
506      results = {
507          "n_models": len(d1_values),
508          "d1_values": d1_values,
509          "dip_test": dip_result,
510          "mixture_1_component": {k: v for k, v in mixture_1.items() if k != "raw_params"},
511          "mixture_2_component": {k: v for k, v in mixture_2.items() if k != "raw_params"},
512          "delta_bic": delta_bic,
513          "delta_aic": mixture_1["aic"] - mixture_2["aic"],
514          "approximate_bayes_factor": float(np.exp(min(delta_bic / 2, 500))),
515          "membership_probabilities": [
516              {
517                  "model_key": records[i]["model_key"],
518                  "model_display": records[i]["model_display"],
519                  "d1": records[i]["d1"],
520                  "p_components": membership[i],
521              }
522              for i in range(len(records))
523          ],
524      }
525      OUTPUT_JSON.write_text(json.dumps(results, indent=2))
526      print(f"  JSON: {OUTPUT_JSON}")
527  
528      print(f"\n{'='*60}")
529      print("BIMODALITY TESTING COMPLETE")
530      print(f"{'='*60}")
531  
532  
533  if __name__ == "__main__":
534      main()