postgres_conversation_store.py
1 """ 2 PostgreSQL ConversationStore ā Full Persistence Example 3 4 Demonstrates: 5 - Creating a PostgreSQL ConversationStore 6 - Creating sessions, adding messages, storing metadata 7 - Direct SQL verification that data is in PostgreSQL 8 - Session resume after simulated restart 9 10 Requirements: 11 pip install praisonai psycopg2-binary 12 Docker: PostgreSQL on localhost:5432 (user=postgres, password=postgres) 13 14 Run: 15 python postgres_conversation_store.py 16 """ 17 18 import uuid 19 import psycopg2 20 from praisonai.persistence.conversation.postgres import PostgresConversationStore 21 from praisonai.persistence.conversation.base import ConversationSession, ConversationMessage 22 23 PG_URL = "postgresql://postgres:postgres@localhost:5432/postgres" 24 TABLE_PREFIX = f"example_{uuid.uuid4().hex[:6]}_" 25 26 print(f"PostgreSQL URL: {PG_URL}") 27 print(f"Table prefix: {TABLE_PREFIX}\n") 28 29 store = PostgresConversationStore(url=PG_URL, table_prefix=TABLE_PREFIX) 30 SESSION_ID = f"pg-demo-{uuid.uuid4().hex[:8]}" 31 32 # --- Phase 1: Create session and add messages --- 33 print("=== Phase 1: Create Session & Add Messages ===") 34 session = ConversationSession( 35 session_id=SESSION_ID, 36 agent_id="pg_demo_agent", 37 name="PostgreSQL Demo", 38 metadata={"agent_version": 5, "model": "gpt-4o"}, 39 ) 40 store.create_session(session) 41 42 messages = [ 43 ("user", "What are the benefits of PostgreSQL?"), 44 ("assistant", "PostgreSQL offers ACID compliance, extensibility, JSON support, and excellent performance."), 45 ("user", "How does it compare to MySQL?"), 46 ("assistant", "PostgreSQL has better standards compliance and advanced features, while MySQL is simpler for basic use."), 47 ] 48 49 for role, content in messages: 50 msg = ConversationMessage(session_id=SESSION_ID, role=role, content=content) 51 store.add_message(SESSION_ID, msg) 52 print(f" Added [{role}]: {content[:60]}") 53 54 print() 55 56 # --- Phase 2: Verify data in PostgreSQL --- 57 print("=== Phase 2: Verify Data in PostgreSQL ===") 58 retrieved = store.get_session(SESSION_ID) 59 print(f" Session: {retrieved.session_id}") 60 print(f" Agent ID: {retrieved.agent_id}") 61 print(f" Metadata: {retrieved.metadata}") 62 63 stored = store.get_messages(SESSION_ID) 64 print(f" Messages: {len(stored)}") 65 for msg in stored: 66 print(f" [{msg.role}] {msg.content[:70]}") 67 68 print() 69 70 # --- Phase 3: Direct SQL Verification --- 71 print("=== Phase 3: Direct SQL Verification ===") 72 conn = psycopg2.connect(PG_URL) 73 cur = conn.cursor() 74 cur.execute(f"SELECT COUNT(*) FROM public.{TABLE_PREFIX}messages WHERE session_id = %s", (SESSION_ID,)) 75 count = cur.fetchone()[0] 76 print(f" Raw SQL message count: {count}") 77 assert count == 4, f"Expected 4, got {count}" 78 79 cur.execute(f"SELECT agent_id FROM public.{TABLE_PREFIX}sessions WHERE session_id = %s", (SESSION_ID,)) 80 agent_id = cur.fetchone()[0] 81 print(f" Raw SQL agent_id: {agent_id}") 82 assert agent_id == "pg_demo_agent" 83 conn.close() 84 85 print() 86 87 # --- Phase 4: Session Resume --- 88 print("=== Phase 4: Session Resume (Simulating Restart) ===") 89 store.close() 90 91 store2 = PostgresConversationStore(url=PG_URL, table_prefix=TABLE_PREFIX) 92 resumed = store2.get_session(SESSION_ID) 93 assert resumed is not None, "Session not found after restart!" 94 print(f" Resumed session: {resumed.session_id}") 95 print(f" Metadata preserved: {resumed.metadata}") 96 97 msgs = store2.get_messages(SESSION_ID) 98 print(f" Messages recovered: {len(msgs)}") 99 assert len(msgs) == 4 100 101 # Continue 102 store2.add_message(SESSION_ID, ConversationMessage(session_id=SESSION_ID, role="user", content="Thanks!")) 103 store2.add_message(SESSION_ID, ConversationMessage(session_id=SESSION_ID, role="assistant", content="You're welcome!")) 104 final = store2.get_messages(SESSION_ID) 105 print(f" Messages after resume: {len(final)}") 106 assert len(final) == 6 107 108 store2.close() 109 110 # --- Cleanup --- 111 print("\n=== Cleanup ===") 112 conn = psycopg2.connect(PG_URL) 113 conn.autocommit = True 114 cur = conn.cursor() 115 cur.execute(f"DROP TABLE IF EXISTS public.{TABLE_PREFIX}messages CASCADE") 116 cur.execute(f"DROP TABLE IF EXISTS public.{TABLE_PREFIX}sessions CASCADE") 117 conn.close() 118 print(" Tables dropped.") 119 120 print("\nā PostgreSQL ConversationStore ā All tests passed!")