clickhouse_persistence.py
1 """ 2 ClickHouse ā Persistence Example 3 4 Demonstrates: 5 - Connecting to ClickHouse 6 - Creating tables for agent state 7 - Writing and reading agent metadata 8 - JSON state storage and retrieval 9 - Session resume after simulated restart 10 11 Requirements: 12 pip install clickhouse-connect 13 Docker: ClickHouse on localhost:8123 (user=clickhouse, password=clickhouse) 14 15 Run: 16 python clickhouse_persistence.py 17 """ 18 19 import json 20 import uuid 21 import clickhouse_connect 22 23 CH_HOST = "localhost" 24 CH_PORT = 8123 25 CH_USER = "clickhouse" 26 CH_PASSWORD = "clickhouse" 27 28 print(f"ClickHouse: {CH_HOST}:{CH_PORT}\n") 29 30 client = clickhouse_connect.get_client( 31 host=CH_HOST, port=CH_PORT, username=CH_USER, password=CH_PASSWORD, 32 ) 33 34 TABLE_SESSIONS = f"praison_sessions_{uuid.uuid4().hex[:8]}" 35 TABLE_STATE = f"praison_state_{uuid.uuid4().hex[:8]}" 36 37 # --- Phase 1: Create tables and insert session data --- 38 print("=== Phase 1: Create Tables & Insert Sessions ===") 39 client.command(f""" 40 CREATE TABLE IF NOT EXISTS {TABLE_SESSIONS} ( 41 session_id String, 42 agent_id String, 43 agent_version UInt32, 44 total_input_tokens UInt64, 45 total_output_tokens UInt64, 46 model String, 47 created_at DateTime DEFAULT now() 48 ) ENGINE = MergeTree() 49 ORDER BY session_id 50 """) 51 52 sessions = [ 53 ["session_ch_001", "agent_demo_001", 3, 500, 200, "gpt-4o"], 54 ["session_ch_002", "agent_demo_002", 1, 1000, 400, "gpt-4o-mini"], 55 ["session_ch_003", "agent_demo_001", 3, 750, 300, "gpt-4o"], 56 ] 57 58 client.insert( 59 TABLE_SESSIONS, sessions, 60 column_names=["session_id", "agent_id", "agent_version", "total_input_tokens", "total_output_tokens", "model"], 61 ) 62 63 for s in sessions: 64 print(f" Inserted: {s[0]} (agent={s[1]}, tokens_in={s[3]})") 65 66 print() 67 68 # --- Phase 2: Query and verify --- 69 print("=== Phase 2: Query & Verify ===") 70 result = client.query(f"SELECT * FROM {TABLE_SESSIONS} ORDER BY session_id") 71 print(f" Total sessions: {len(result.result_rows)}") 72 for row in result.result_rows: 73 print(f" {row[0]}: agent={row[1]}, v{row[2]}, tokens_in={row[3]}, model={row[5]}") 74 75 assert len(result.result_rows) == 3 76 assert result.result_rows[0][0] == "session_ch_001" 77 78 # Aggregate query 79 agg = client.query(f"SELECT agent_id, SUM(total_input_tokens) as total_in FROM {TABLE_SESSIONS} GROUP BY agent_id ORDER BY agent_id") 80 print("\n Tokens by agent:") 81 for row in agg.result_rows: 82 print(f" {row[0]}: {row[1]} input tokens") 83 84 print() 85 86 # --- Phase 3: JSON state storage --- 87 print("=== Phase 3: JSON State Storage ===") 88 client.command(f""" 89 CREATE TABLE IF NOT EXISTS {TABLE_STATE} ( 90 session_id String, 91 state_json String, 92 updated_at DateTime DEFAULT now() 93 ) ENGINE = ReplacingMergeTree(updated_at) 94 ORDER BY session_id 95 """) 96 97 state = { 98 "agent_id": "agent_demo_001", 99 "agent_version": 3, 100 "environment_id": "env_ch_001", 101 "total_input_tokens": 2000, 102 "total_output_tokens": 800, 103 "compute_instance_id": "flyio_ch_001", 104 "session_history": [ 105 {"id": "session_ch_001", "status": "completed"}, 106 {"id": "session_ch_003", "status": "idle"}, 107 ], 108 } 109 110 client.insert( 111 TABLE_STATE, 112 [["session_ch_001", json.dumps(state)]], 113 column_names=["session_id", "state_json"], 114 ) 115 print(" Stored JSON state for session_ch_001") 116 117 # Retrieve 118 result = client.query(f"SELECT state_json FROM {TABLE_STATE} WHERE session_id = 'session_ch_001'") 119 recovered = json.loads(result.result_rows[0][0]) 120 print(f" Recovered agent_id: {recovered['agent_id']}") 121 print(f" Recovered tokens: in={recovered['total_input_tokens']}, out={recovered['total_output_tokens']}") 122 print(f" Recovered compute: {recovered['compute_instance_id']}") 123 assert recovered["total_input_tokens"] == 2000 124 125 print() 126 127 # --- Phase 4: Session Resume --- 128 print("=== Phase 4: Session Resume (Simulating Restart) ===") 129 client.close() 130 131 client2 = clickhouse_connect.get_client( 132 host=CH_HOST, port=CH_PORT, username=CH_USER, password=CH_PASSWORD, 133 ) 134 135 result2 = client2.query(f"SELECT state_json FROM {TABLE_STATE} WHERE session_id = 'session_ch_001'") 136 assert len(result2.result_rows) > 0, "State not found after restart!" 137 recovered2 = json.loads(result2.result_rows[0][0]) 138 print(f" Recovered after restart: agent={recovered2['agent_id']}, tokens_in={recovered2['total_input_tokens']}") 139 140 # Update state 141 recovered2["total_input_tokens"] += 500 142 client2.insert( 143 TABLE_STATE, 144 [["session_ch_001", json.dumps(recovered2)]], 145 column_names=["session_id", "state_json"], 146 ) 147 print(f" Updated tokens_in: {recovered2['total_input_tokens']}") # noqa: F541 148 149 # --- Cleanup --- 150 print("\n=== Cleanup ===") 151 client2.command(f"DROP TABLE IF EXISTS {TABLE_SESSIONS}") 152 client2.command(f"DROP TABLE IF EXISTS {TABLE_STATE}") 153 client2.close() 154 print(" Tables dropped.") 155 156 print("\nā ClickHouse Persistence ā All tests passed!")