/ examples / persistence / postgres_conversation_store.py
postgres_conversation_store.py
  1  """
  2  PostgreSQL ConversationStore — Full Persistence Example
  3  
  4  Demonstrates:
  5    - Creating a PostgreSQL ConversationStore
  6    - Creating sessions, adding messages, storing metadata
  7    - Direct SQL verification that data is in PostgreSQL
  8    - Session resume after simulated restart
  9  
 10  Requirements:
 11      pip install praisonai psycopg2-binary
 12      Docker: PostgreSQL on localhost:5432 (user=postgres, password=postgres)
 13  
 14  Run:
 15      python postgres_conversation_store.py
 16  """
 17  
 18  import uuid
 19  import psycopg2
 20  from praisonai.persistence.conversation.postgres import PostgresConversationStore
 21  from praisonai.persistence.conversation.base import ConversationSession, ConversationMessage
 22  
 23  PG_URL = "postgresql://postgres:postgres@localhost:5432/postgres"
 24  TABLE_PREFIX = f"example_{uuid.uuid4().hex[:6]}_"
 25  
 26  print(f"PostgreSQL URL: {PG_URL}")
 27  print(f"Table prefix: {TABLE_PREFIX}\n")
 28  
 29  store = PostgresConversationStore(url=PG_URL, table_prefix=TABLE_PREFIX)
 30  SESSION_ID = f"pg-demo-{uuid.uuid4().hex[:8]}"
 31  
 32  # --- Phase 1: Create session and add messages ---
 33  print("=== Phase 1: Create Session & Add Messages ===")
 34  session = ConversationSession(
 35      session_id=SESSION_ID,
 36      agent_id="pg_demo_agent",
 37      name="PostgreSQL Demo",
 38      metadata={"agent_version": 5, "model": "gpt-4o"},
 39  )
 40  store.create_session(session)
 41  
 42  messages = [
 43      ("user", "What are the benefits of PostgreSQL?"),
 44      ("assistant", "PostgreSQL offers ACID compliance, extensibility, JSON support, and excellent performance."),
 45      ("user", "How does it compare to MySQL?"),
 46      ("assistant", "PostgreSQL has better standards compliance and advanced features, while MySQL is simpler for basic use."),
 47  ]
 48  
 49  for role, content in messages:
 50      msg = ConversationMessage(session_id=SESSION_ID, role=role, content=content)
 51      store.add_message(SESSION_ID, msg)
 52      print(f"  Added [{role}]: {content[:60]}")
 53  
 54  print()
 55  
 56  # --- Phase 2: Verify data in PostgreSQL ---
 57  print("=== Phase 2: Verify Data in PostgreSQL ===")
 58  retrieved = store.get_session(SESSION_ID)
 59  print(f"  Session: {retrieved.session_id}")
 60  print(f"  Agent ID: {retrieved.agent_id}")
 61  print(f"  Metadata: {retrieved.metadata}")
 62  
 63  stored = store.get_messages(SESSION_ID)
 64  print(f"  Messages: {len(stored)}")
 65  for msg in stored:
 66      print(f"    [{msg.role}] {msg.content[:70]}")
 67  
 68  print()
 69  
 70  # --- Phase 3: Direct SQL Verification ---
 71  print("=== Phase 3: Direct SQL Verification ===")
 72  conn = psycopg2.connect(PG_URL)
 73  cur = conn.cursor()
 74  cur.execute(f"SELECT COUNT(*) FROM public.{TABLE_PREFIX}messages WHERE session_id = %s", (SESSION_ID,))
 75  count = cur.fetchone()[0]
 76  print(f"  Raw SQL message count: {count}")
 77  assert count == 4, f"Expected 4, got {count}"
 78  
 79  cur.execute(f"SELECT agent_id FROM public.{TABLE_PREFIX}sessions WHERE session_id = %s", (SESSION_ID,))
 80  agent_id = cur.fetchone()[0]
 81  print(f"  Raw SQL agent_id: {agent_id}")
 82  assert agent_id == "pg_demo_agent"
 83  conn.close()
 84  
 85  print()
 86  
 87  # --- Phase 4: Session Resume ---
 88  print("=== Phase 4: Session Resume (Simulating Restart) ===")
 89  store.close()
 90  
 91  store2 = PostgresConversationStore(url=PG_URL, table_prefix=TABLE_PREFIX)
 92  resumed = store2.get_session(SESSION_ID)
 93  assert resumed is not None, "Session not found after restart!"
 94  print(f"  Resumed session: {resumed.session_id}")
 95  print(f"  Metadata preserved: {resumed.metadata}")
 96  
 97  msgs = store2.get_messages(SESSION_ID)
 98  print(f"  Messages recovered: {len(msgs)}")
 99  assert len(msgs) == 4
100  
101  # Continue
102  store2.add_message(SESSION_ID, ConversationMessage(session_id=SESSION_ID, role="user", content="Thanks!"))
103  store2.add_message(SESSION_ID, ConversationMessage(session_id=SESSION_ID, role="assistant", content="You're welcome!"))
104  final = store2.get_messages(SESSION_ID)
105  print(f"  Messages after resume: {len(final)}")
106  assert len(final) == 6
107  
108  store2.close()
109  
110  # --- Cleanup ---
111  print("\n=== Cleanup ===")
112  conn = psycopg2.connect(PG_URL)
113  conn.autocommit = True
114  cur = conn.cursor()
115  cur.execute(f"DROP TABLE IF EXISTS public.{TABLE_PREFIX}messages CASCADE")
116  cur.execute(f"DROP TABLE IF EXISTS public.{TABLE_PREFIX}sessions CASCADE")
117  conn.close()
118  print("  Tables dropped.")
119  
120  print("\nāœ… PostgreSQL ConversationStore — All tests passed!")