/ examples / persistence / clickhouse_persistence.py
clickhouse_persistence.py
  1  """
  2  ClickHouse — Persistence Example
  3  
  4  Demonstrates:
  5    - Connecting to ClickHouse
  6    - Creating tables for agent state
  7    - Writing and reading agent metadata
  8    - JSON state storage and retrieval
  9    - Session resume after simulated restart
 10  
 11  Requirements:
 12      pip install clickhouse-connect
 13      Docker: ClickHouse on localhost:8123 (user=clickhouse, password=clickhouse)
 14  
 15  Run:
 16      python clickhouse_persistence.py
 17  """
 18  
 19  import json
 20  import uuid
 21  import clickhouse_connect
 22  
 23  CH_HOST = "localhost"
 24  CH_PORT = 8123
 25  CH_USER = "clickhouse"
 26  CH_PASSWORD = "clickhouse"
 27  
 28  print(f"ClickHouse: {CH_HOST}:{CH_PORT}\n")
 29  
 30  client = clickhouse_connect.get_client(
 31      host=CH_HOST, port=CH_PORT, username=CH_USER, password=CH_PASSWORD,
 32  )
 33  
 34  TABLE_SESSIONS = f"praison_sessions_{uuid.uuid4().hex[:8]}"
 35  TABLE_STATE = f"praison_state_{uuid.uuid4().hex[:8]}"
 36  
 37  # --- Phase 1: Create tables and insert session data ---
 38  print("=== Phase 1: Create Tables & Insert Sessions ===")
 39  client.command(f"""
 40      CREATE TABLE IF NOT EXISTS {TABLE_SESSIONS} (
 41          session_id String,
 42          agent_id String,
 43          agent_version UInt32,
 44          total_input_tokens UInt64,
 45          total_output_tokens UInt64,
 46          model String,
 47          created_at DateTime DEFAULT now()
 48      ) ENGINE = MergeTree()
 49      ORDER BY session_id
 50  """)
 51  
 52  sessions = [
 53      ["session_ch_001", "agent_demo_001", 3, 500, 200, "gpt-4o"],
 54      ["session_ch_002", "agent_demo_002", 1, 1000, 400, "gpt-4o-mini"],
 55      ["session_ch_003", "agent_demo_001", 3, 750, 300, "gpt-4o"],
 56  ]
 57  
 58  client.insert(
 59      TABLE_SESSIONS, sessions,
 60      column_names=["session_id", "agent_id", "agent_version", "total_input_tokens", "total_output_tokens", "model"],
 61  )
 62  
 63  for s in sessions:
 64      print(f"  Inserted: {s[0]} (agent={s[1]}, tokens_in={s[3]})")
 65  
 66  print()
 67  
 68  # --- Phase 2: Query and verify ---
 69  print("=== Phase 2: Query & Verify ===")
 70  result = client.query(f"SELECT * FROM {TABLE_SESSIONS} ORDER BY session_id")
 71  print(f"  Total sessions: {len(result.result_rows)}")
 72  for row in result.result_rows:
 73      print(f"    {row[0]}: agent={row[1]}, v{row[2]}, tokens_in={row[3]}, model={row[5]}")
 74  
 75  assert len(result.result_rows) == 3
 76  assert result.result_rows[0][0] == "session_ch_001"
 77  
 78  # Aggregate query
 79  agg = client.query(f"SELECT agent_id, SUM(total_input_tokens) as total_in FROM {TABLE_SESSIONS} GROUP BY agent_id ORDER BY agent_id")
 80  print("\n  Tokens by agent:")
 81  for row in agg.result_rows:
 82      print(f"    {row[0]}: {row[1]} input tokens")
 83  
 84  print()
 85  
 86  # --- Phase 3: JSON state storage ---
 87  print("=== Phase 3: JSON State Storage ===")
 88  client.command(f"""
 89      CREATE TABLE IF NOT EXISTS {TABLE_STATE} (
 90          session_id String,
 91          state_json String,
 92          updated_at DateTime DEFAULT now()
 93      ) ENGINE = ReplacingMergeTree(updated_at)
 94      ORDER BY session_id
 95  """)
 96  
 97  state = {
 98      "agent_id": "agent_demo_001",
 99      "agent_version": 3,
100      "environment_id": "env_ch_001",
101      "total_input_tokens": 2000,
102      "total_output_tokens": 800,
103      "compute_instance_id": "flyio_ch_001",
104      "session_history": [
105          {"id": "session_ch_001", "status": "completed"},
106          {"id": "session_ch_003", "status": "idle"},
107      ],
108  }
109  
110  client.insert(
111      TABLE_STATE,
112      [["session_ch_001", json.dumps(state)]],
113      column_names=["session_id", "state_json"],
114  )
115  print("  Stored JSON state for session_ch_001")
116  
117  # Retrieve
118  result = client.query(f"SELECT state_json FROM {TABLE_STATE} WHERE session_id = 'session_ch_001'")
119  recovered = json.loads(result.result_rows[0][0])
120  print(f"  Recovered agent_id: {recovered['agent_id']}")
121  print(f"  Recovered tokens: in={recovered['total_input_tokens']}, out={recovered['total_output_tokens']}")
122  print(f"  Recovered compute: {recovered['compute_instance_id']}")
123  assert recovered["total_input_tokens"] == 2000
124  
125  print()
126  
127  # --- Phase 4: Session Resume ---
128  print("=== Phase 4: Session Resume (Simulating Restart) ===")
129  client.close()
130  
131  client2 = clickhouse_connect.get_client(
132      host=CH_HOST, port=CH_PORT, username=CH_USER, password=CH_PASSWORD,
133  )
134  
135  result2 = client2.query(f"SELECT state_json FROM {TABLE_STATE} WHERE session_id = 'session_ch_001'")
136  assert len(result2.result_rows) > 0, "State not found after restart!"
137  recovered2 = json.loads(result2.result_rows[0][0])
138  print(f"  Recovered after restart: agent={recovered2['agent_id']}, tokens_in={recovered2['total_input_tokens']}")
139  
140  # Update state
141  recovered2["total_input_tokens"] += 500
142  client2.insert(
143      TABLE_STATE,
144      [["session_ch_001", json.dumps(recovered2)]],
145      column_names=["session_id", "state_json"],
146  )
147  print(f"  Updated tokens_in: {recovered2['total_input_tokens']}")  # noqa: F541
148  
149  # --- Cleanup ---
150  print("\n=== Cleanup ===")
151  client2.command(f"DROP TABLE IF EXISTS {TABLE_SESSIONS}")
152  client2.command(f"DROP TABLE IF EXISTS {TABLE_STATE}")
153  client2.close()
154  print("  Tables dropped.")
155  
156  print("\nāœ… ClickHouse Persistence — All tests passed!")