memory.py
  1  """
  2  Memory system for Kamaji - Store and recall information across sessions.
  3  """
  4  
  5  import json
  6  from pathlib import Path
  7  from typing import List, Dict, Optional
  8  from datetime import datetime
  9  
 10  
 11  class Memory:
 12      """Manages Kamaji's persistent memory storage."""
 13  
 14      def __init__(self, memory_file: Optional[Path] = None):
 15          """
 16          Initialize memory system.
 17  
 18          Args:
 19              memory_file: Path to memory JSON file. If None, uses default location.
 20          """
 21          if memory_file is None:
 22              # Store in ~/.kamaji/memory.json
 23              kamaji_dir = Path.home() / ".kamaji"
 24              kamaji_dir.mkdir(exist_ok=True)
 25              memory_file = kamaji_dir / "memory.json"
 26  
 27          self.memory_file = memory_file
 28          self.memories: List[Dict] = []
 29          self.load()
 30  
 31      def load(self):
 32          """Load memories from file."""
 33          if self.memory_file.exists():
 34              try:
 35                  with open(self.memory_file, 'r') as f:
 36                      data = json.load(f)
 37                      self.memories = data.get('memories', [])
 38              except Exception as e:
 39                  print(f"Warning: Could not load memories: {e}")
 40                  self.memories = []
 41          else:
 42              self.memories = []
 43  
 44      def save(self):
 45          """Save memories to file."""
 46          try:
 47              with open(self.memory_file, 'w') as f:
 48                  json.dump({
 49                      'memories': self.memories,
 50                      'last_updated': datetime.now().isoformat()
 51                  }, f, indent=2)
 52          except Exception as e:
 53              print(f"Warning: Could not save memories: {e}")
 54  
 55      def memorize(self, key: str, content: str, tags: Optional[List[str]] = None) -> Dict:
 56          """
 57          Store a memory.
 58  
 59          Args:
 60              key: Short key/title for the memory
 61              content: The content to remember
 62              tags: Optional list of tags for categorization
 63  
 64          Returns:
 65              The created memory object
 66          """
 67          # Check if key already exists and update it
 68          existing = self.get_memory(key)
 69          if existing:
 70              existing['content'] = content
 71              existing['tags'] = tags or []
 72              existing['updated_at'] = datetime.now().isoformat()
 73              self.save()
 74              return existing
 75  
 76          # Create new memory
 77          memory = {
 78              'id': len(self.memories) + 1,
 79              'key': key,
 80              'content': content,
 81              'tags': tags or [],
 82              'created_at': datetime.now().isoformat(),
 83              'updated_at': datetime.now().isoformat(),
 84              'access_count': 0
 85          }
 86          self.memories.append(memory)
 87          self.save()
 88          return memory
 89  
 90      def remember(self, query: str) -> List[Dict]:
 91          """
 92          Recall memories matching the query.
 93  
 94          Args:
 95              query: Search query (matches key, content, or tags)
 96  
 97          Returns:
 98              List of matching memories, sorted by relevance
 99          """
100          query_lower = query.lower()
101          matches = []
102  
103          for memory in self.memories:
104              score = 0
105  
106              # Exact key match (highest priority)
107              if memory['key'].lower() == query_lower:
108                  score += 100
109                  memory['access_count'] += 1
110  
111              # Key contains query
112              elif query_lower in memory['key'].lower():
113                  score += 50
114  
115              # Content contains query
116              elif query_lower in memory['content'].lower():
117                  score += 20
118  
119              # Tag match
120              for tag in memory['tags']:
121                  if query_lower in tag.lower():
122                      score += 30
123                      break
124  
125              if score > 0:
126                  matches.append((score, memory))
127  
128          # Save updated access counts
129          if matches:
130              self.save()
131  
132          # Sort by score (descending) and return memories
133          matches.sort(key=lambda x: x[0], reverse=True)
134          return [m[1] for m in matches]
135  
136      def get_memory(self, key: str) -> Optional[Dict]:
137          """
138          Get a specific memory by exact key match.
139  
140          Args:
141              key: Memory key
142  
143          Returns:
144              Memory object or None
145          """
146          for memory in self.memories:
147              if memory['key'].lower() == key.lower():
148                  return memory
149          return None
150  
151      def get_all_memories(self) -> List[Dict]:
152          """
153          Get all memories.
154  
155          Returns:
156              List of all memories, sorted by last updated
157          """
158          sorted_memories = sorted(
159              self.memories,
160              key=lambda m: m['updated_at'],
161              reverse=True
162          )
163          return sorted_memories
164  
165      def delete_memory(self, key: str) -> bool:
166          """
167          Delete a memory.
168  
169          Args:
170              key: Memory key to delete
171  
172          Returns:
173              True if deleted, False if not found
174          """
175          for i, memory in enumerate(self.memories):
176              if memory['key'].lower() == key.lower():
177                  del self.memories[i]
178                  self.save()
179                  return True
180          return False
181  
182      def clear_all(self):
183          """Clear all memories."""
184          self.memories = []
185          self.save()
186  
187      def get_stats(self) -> Dict:
188          """
189          Get memory statistics.
190  
191          Returns:
192              Dictionary with stats
193          """
194          if not self.memories:
195              return {
196                  'total': 0,
197                  'most_accessed': None,
198                  'recently_added': None,
199                  'total_tags': 0
200              }
201  
202          # Most accessed
203          most_accessed = max(self.memories, key=lambda m: m['access_count'])
204  
205          # Most recently added
206          recently_added = max(self.memories, key=lambda m: m['created_at'])
207  
208          # Unique tags
209          all_tags = set()
210          for memory in self.memories:
211              all_tags.update(memory['tags'])
212  
213          return {
214              'total': len(self.memories),
215              'most_accessed': most_accessed,
216              'recently_added': recently_added,
217              'total_tags': len(all_tags),
218              'all_tags': sorted(all_tags)
219          }
220  
221      def format_memory_display(self) -> str:
222          """
223          Format all memories for display.
224  
225          Returns:
226              Formatted string
227          """
228          if not self.memories:
229              return "🧠 No memories stored yet.\n\nUse 'memorize <key> <content>' to store something!"
230  
231          stats = self.get_stats()
232  
233          lines = []
234          lines.append("\n🧠 Memory Bank")
235          lines.append("=" * 60)
236          lines.append(f"\nšŸ“Š Stats: {stats['total']} memories, {stats['total_tags']} tags")
237  
238          if stats['all_tags']:
239              lines.append(f"šŸ·ļø  Tags: {', '.join(stats['all_tags'][:10])}")
240              if len(stats['all_tags']) > 10:
241                  lines.append(f"       ... and {len(stats['all_tags']) - 10} more")
242  
243          lines.append("\nšŸ“š Memories (most recent first):")
244          lines.append("-" * 60)
245  
246          for memory in self.get_all_memories()[:20]:  # Show last 20
247              # Format timestamp
248              updated = datetime.fromisoformat(memory['updated_at'])
249              time_str = updated.strftime('%Y-%m-%d %H:%M')
250  
251              # Format tags
252              tags_str = f" [{', '.join(memory['tags'])}]" if memory['tags'] else ""
253  
254              # Format content (truncate if long)
255              content = memory['content']
256              if len(content) > 80:
257                  content = content[:77] + "..."
258  
259              lines.append(f"\n🧠 {memory['key']}{tags_str}")
260              lines.append(f"   {content}")
261              lines.append(f"   šŸ“… {time_str} | šŸ‘ļø  Accessed {memory['access_count']} times")
262  
263          if len(self.memories) > 20:
264              lines.append(f"\n... and {len(self.memories) - 20} more memories")
265  
266          lines.append("\n" + "=" * 60)
267          lines.append("\nšŸ’” Commands:")
268          lines.append("   • memorize <key> <content> - Store a memory")
269          lines.append("   • remember <query> - Search memories")
270          lines.append("   • memory - Show all memories (this view)")
271  
272          return '\n'.join(lines)
273  
274  
275  def get_memory() -> Memory:
276      """
277      Get the global memory instance.
278  
279      Returns:
280          Memory instance
281      """
282      if not hasattr(get_memory, '_instance'):
283          get_memory._instance = Memory()
284      return get_memory._instance