demo_parallel_memory_access.exs
1 #!/usr/bin/env elixir 2 3 # Demo: Parallel Agent Memory Access 4 # Demonstrates how multiple agents running in parallel query shared memory 5 # Usage: mix run shared/scripts/demo_parallel_memory_access.exs 6 7 alias EchoShared.Repo 8 alias EchoShared.Schemas.Memory 9 import Ecto.Query 10 11 defmodule ParallelMemoryDemo do 12 @moduledoc """ 13 Simulates multiple agents querying shared memory in parallel. 14 Shows how ECHO's organizational memory is accessed concurrently. 15 """ 16 17 def run do 18 IO.puts("\n" <> IO.ANSI.blue() <> "╔═══════════════════════════════════════════════════════╗" <> IO.ANSI.reset()) 19 IO.puts(IO.ANSI.blue() <> "║ ECHO Parallel Agent Memory Access Demo ║" <> IO.ANSI.reset()) 20 IO.puts(IO.ANSI.blue() <> "╚═══════════════════════════════════════════════════════╝" <> IO.ANSI.reset() <> "\n") 21 22 # Simulate parallel queries from different agents 23 agents = [ 24 {:ceo, "What are our strategic priorities?", ["strategy", "okr"]}, 25 {:cto, "What technology decisions have been made?", ["technology", "architecture"]}, 26 {:product_manager, "What features are prioritized?", ["product", "roadmap"]}, 27 {:senior_architect, "What are the architecture patterns?", ["architecture", "patterns"]}, 28 {:test_lead, "What's our testing approach?", ["testing", "quality"]}, 29 {:operations_head, "How do we monitor the system?", ["operations", "monitoring"]} 30 ] 31 32 IO.puts(IO.ANSI.cyan() <> "Scenario: 6 agents query shared memory in parallel" <> IO.ANSI.reset()) 33 IO.puts("─────────────────────────────────────────\n") 34 35 # Spawn parallel tasks for each agent 36 start_time = System.monotonic_time(:millisecond) 37 38 tasks = Enum.map(agents, fn {role, question, tags} -> 39 Task.async(fn -> 40 agent_query_memory(role, question, tags) 41 end) 42 end) 43 44 # Wait for all tasks to complete 45 results = Task.await_many(tasks, 5000) 46 47 end_time = System.monotonic_time(:millisecond) 48 duration = end_time - start_time 49 50 # Display results 51 Enum.each(results, fn {role, question, memories, query_time} -> 52 display_agent_result(role, question, memories, query_time) 53 end) 54 55 # Summary 56 IO.puts("\n" <> IO.ANSI.blue() <> "═════════════════════════════════════════" <> IO.ANSI.reset()) 57 IO.puts(IO.ANSI.green() <> "✓ All agents completed queries in parallel" <> IO.ANSI.reset()) 58 IO.puts("Total time: #{duration}ms (parallel execution)") 59 60 total_queries = length(results) 61 avg_time = Enum.map(results, fn {_, _, _, t} -> t end) |> Enum.sum() |> div(total_queries) 62 IO.puts("Average query time: #{avg_time}ms") 63 IO.puts("Total memories accessed: #{Enum.map(results, fn {_, _, m, _} -> length(m) end) |> Enum.sum()}") 64 IO.puts(IO.ANSI.blue() <> "═════════════════════════════════════════" <> IO.ANSI.reset() <> "\n") 65 66 # Show concurrent access pattern 67 demonstrate_concurrent_writes() 68 end 69 70 defp agent_query_memory(role, question, tags) do 71 # Simulate agent querying memory by tags 72 start_time = System.monotonic_time(:millisecond) 73 74 memories = Repo.all( 75 from m in Memory, 76 where: fragment("? && ?", m.tags, ^tags), 77 order_by: [desc: m.inserted_at] 78 ) 79 80 end_time = System.monotonic_time(:millisecond) 81 query_time = end_time - start_time 82 83 # Simulate some processing time 84 Process.sleep(:rand.uniform(50)) 85 86 {role, question, memories, query_time} 87 end 88 89 defp display_agent_result(role, question, memories, query_time) do 90 IO.puts(IO.ANSI.green() <> "#{role |> to_string() |> String.upcase()}" <> IO.ANSI.reset()) 91 IO.puts(" Question: #{question}") 92 IO.puts(" Found: #{length(memories)} relevant memories (#{query_time}ms)") 93 94 Enum.take(memories, 2) |> Enum.each(fn memory -> 95 IO.puts(" • #{memory.key}") 96 end) 97 98 IO.puts("") 99 end 100 101 defp demonstrate_concurrent_writes do 102 IO.puts(IO.ANSI.cyan() <> "\nDemonstrating concurrent memory creation..." <> IO.ANSI.reset()) 103 IO.puts("─────────────────────────────────────────\n") 104 105 # Simulate 3 agents creating memories concurrently 106 write_tasks = [ 107 Task.async(fn -> 108 create_memory("ceo_daily_standup_#{DateTime.utc_now() |> DateTime.to_unix()}", 109 "Daily standup notes: All teams on track, no blockers", 110 ["standup", "daily", "status"], 111 "ceo") 112 end), 113 Task.async(fn -> 114 create_memory("senior_dev_code_review_#{DateTime.utc_now() |> DateTime.to_unix()}", 115 "Code review completed for PR #123: LGTM, approved for merge", 116 ["code-review", "development"], 117 "senior_developer") 118 end), 119 Task.async(fn -> 120 create_memory("product_mgr_customer_call_#{DateTime.utc_now() |> DateTime.to_unix()}", 121 "Customer call notes: Requested export to CSV feature", 122 ["customer-feedback", "feature-request"], 123 "product_manager") 124 end) 125 ] 126 127 write_results = Task.await_many(write_tasks, 5000) 128 129 Enum.each(write_results, fn 130 {:ok, memory} -> 131 IO.puts(IO.ANSI.green() <> "✓ " <> IO.ANSI.reset() <> "Created: #{memory.key} (by #{memory.created_by_role})") 132 {:error, reason} -> 133 IO.puts(IO.ANSI.red() <> "✗ Failed: #{inspect(reason)}" <> IO.ANSI.reset()) 134 end) 135 136 IO.puts("\n" <> IO.ANSI.cyan() <> "Concurrent writes completed successfully!" <> IO.ANSI.reset()) 137 IO.puts("PostgreSQL handles concurrent inserts with ACID guarantees.\n") 138 end 139 140 defp create_memory(key, content, tags, role) do 141 # Simulate some processing before write 142 Process.sleep(:rand.uniform(30)) 143 144 %Memory{} 145 |> Memory.changeset(%{ 146 key: key, 147 content: content, 148 tags: tags, 149 created_by_role: role, 150 metadata: %{created_via: "parallel_demo", timestamp: DateTime.utc_now() |> DateTime.to_iso8601()} 151 }) 152 |> Repo.insert() 153 end 154 end 155 156 ParallelMemoryDemo.run()