/ apps / echo_shared / scripts / demo_parallel_memory_access.exs
demo_parallel_memory_access.exs
  1  #!/usr/bin/env elixir
  2  
  3  # Demo: Parallel Agent Memory Access
  4  # Demonstrates how multiple agents running in parallel query shared memory
  5  # Usage: mix run shared/scripts/demo_parallel_memory_access.exs
  6  
  7  alias EchoShared.Repo
  8  alias EchoShared.Schemas.Memory
  9  import Ecto.Query
 10  
 11  defmodule ParallelMemoryDemo do
 12    @moduledoc """
 13    Simulates multiple agents querying shared memory in parallel.
 14    Shows how ECHO's organizational memory is accessed concurrently.
 15    """
 16  
 17    def run do
 18      IO.puts("\n" <> IO.ANSI.blue() <> "╔═══════════════════════════════════════════════════════╗" <> IO.ANSI.reset())
 19      IO.puts(IO.ANSI.blue() <> "║  ECHO Parallel Agent Memory Access Demo              ║" <> IO.ANSI.reset())
 20      IO.puts(IO.ANSI.blue() <> "╚═══════════════════════════════════════════════════════╝" <> IO.ANSI.reset() <> "\n")
 21  
 22      # Simulate parallel queries from different agents
 23      agents = [
 24        {:ceo, "What are our strategic priorities?", ["strategy", "okr"]},
 25        {:cto, "What technology decisions have been made?", ["technology", "architecture"]},
 26        {:product_manager, "What features are prioritized?", ["product", "roadmap"]},
 27        {:senior_architect, "What are the architecture patterns?", ["architecture", "patterns"]},
 28        {:test_lead, "What's our testing approach?", ["testing", "quality"]},
 29        {:operations_head, "How do we monitor the system?", ["operations", "monitoring"]}
 30      ]
 31  
 32      IO.puts(IO.ANSI.cyan() <> "Scenario: 6 agents query shared memory in parallel" <> IO.ANSI.reset())
 33      IO.puts("─────────────────────────────────────────\n")
 34  
 35      # Spawn parallel tasks for each agent
 36      start_time = System.monotonic_time(:millisecond)
 37  
 38      tasks = Enum.map(agents, fn {role, question, tags} ->
 39        Task.async(fn ->
 40          agent_query_memory(role, question, tags)
 41        end)
 42      end)
 43  
 44      # Wait for all tasks to complete
 45      results = Task.await_many(tasks, 5000)
 46  
 47      end_time = System.monotonic_time(:millisecond)
 48      duration = end_time - start_time
 49  
 50      # Display results
 51      Enum.each(results, fn {role, question, memories, query_time} ->
 52        display_agent_result(role, question, memories, query_time)
 53      end)
 54  
 55      # Summary
 56      IO.puts("\n" <> IO.ANSI.blue() <> "═════════════════════════════════════════" <> IO.ANSI.reset())
 57      IO.puts(IO.ANSI.green() <> "✓ All agents completed queries in parallel" <> IO.ANSI.reset())
 58      IO.puts("Total time: #{duration}ms (parallel execution)")
 59  
 60      total_queries = length(results)
 61      avg_time = Enum.map(results, fn {_, _, _, t} -> t end) |> Enum.sum() |> div(total_queries)
 62      IO.puts("Average query time: #{avg_time}ms")
 63      IO.puts("Total memories accessed: #{Enum.map(results, fn {_, _, m, _} -> length(m) end) |> Enum.sum()}")
 64      IO.puts(IO.ANSI.blue() <> "═════════════════════════════════════════" <> IO.ANSI.reset() <> "\n")
 65  
 66      # Show concurrent access pattern
 67      demonstrate_concurrent_writes()
 68    end
 69  
 70    defp agent_query_memory(role, question, tags) do
 71      # Simulate agent querying memory by tags
 72      start_time = System.monotonic_time(:millisecond)
 73  
 74      memories = Repo.all(
 75        from m in Memory,
 76        where: fragment("? && ?", m.tags, ^tags),
 77        order_by: [desc: m.inserted_at]
 78      )
 79  
 80      end_time = System.monotonic_time(:millisecond)
 81      query_time = end_time - start_time
 82  
 83      # Simulate some processing time
 84      Process.sleep(:rand.uniform(50))
 85  
 86      {role, question, memories, query_time}
 87    end
 88  
 89    defp display_agent_result(role, question, memories, query_time) do
 90      IO.puts(IO.ANSI.green() <> "#{role |> to_string() |> String.upcase()}" <> IO.ANSI.reset())
 91      IO.puts("  Question: #{question}")
 92      IO.puts("  Found: #{length(memories)} relevant memories (#{query_time}ms)")
 93  
 94      Enum.take(memories, 2) |> Enum.each(fn memory ->
 95        IO.puts("    • #{memory.key}")
 96      end)
 97  
 98      IO.puts("")
 99    end
100  
101    defp demonstrate_concurrent_writes do
102      IO.puts(IO.ANSI.cyan() <> "\nDemonstrating concurrent memory creation..." <> IO.ANSI.reset())
103      IO.puts("─────────────────────────────────────────\n")
104  
105      # Simulate 3 agents creating memories concurrently
106      write_tasks = [
107        Task.async(fn ->
108          create_memory("ceo_daily_standup_#{DateTime.utc_now() |> DateTime.to_unix()}",
109                       "Daily standup notes: All teams on track, no blockers",
110                       ["standup", "daily", "status"],
111                       "ceo")
112        end),
113        Task.async(fn ->
114          create_memory("senior_dev_code_review_#{DateTime.utc_now() |> DateTime.to_unix()}",
115                       "Code review completed for PR #123: LGTM, approved for merge",
116                       ["code-review", "development"],
117                       "senior_developer")
118        end),
119        Task.async(fn ->
120          create_memory("product_mgr_customer_call_#{DateTime.utc_now() |> DateTime.to_unix()}",
121                       "Customer call notes: Requested export to CSV feature",
122                       ["customer-feedback", "feature-request"],
123                       "product_manager")
124        end)
125      ]
126  
127      write_results = Task.await_many(write_tasks, 5000)
128  
129      Enum.each(write_results, fn
130        {:ok, memory} ->
131          IO.puts(IO.ANSI.green() <> "✓ " <> IO.ANSI.reset() <> "Created: #{memory.key} (by #{memory.created_by_role})")
132        {:error, reason} ->
133          IO.puts(IO.ANSI.red() <> "✗ Failed: #{inspect(reason)}" <> IO.ANSI.reset())
134      end)
135  
136      IO.puts("\n" <> IO.ANSI.cyan() <> "Concurrent writes completed successfully!" <> IO.ANSI.reset())
137      IO.puts("PostgreSQL handles concurrent inserts with ACID guarantees.\n")
138    end
139  
140    defp create_memory(key, content, tags, role) do
141      # Simulate some processing before write
142      Process.sleep(:rand.uniform(30))
143  
144      %Memory{}
145      |> Memory.changeset(%{
146        key: key,
147        content: content,
148        tags: tags,
149        created_by_role: role,
150        metadata: %{created_via: "parallel_demo", timestamp: DateTime.utc_now() |> DateTime.to_iso8601()}
151      })
152      |> Repo.insert()
153    end
154  end
155  
156  ParallelMemoryDemo.run()