memory.py
1 """ 2 Memory system for Kamaji - Store and recall information across sessions. 3 """ 4 5 import json 6 from pathlib import Path 7 from typing import List, Dict, Optional 8 from datetime import datetime 9 10 11 class Memory: 12 """Manages Kamaji's persistent memory storage.""" 13 14 def __init__(self, memory_file: Optional[Path] = None): 15 """ 16 Initialize memory system. 17 18 Args: 19 memory_file: Path to memory JSON file. If None, uses default location. 20 """ 21 if memory_file is None: 22 # Store in ~/.kamaji/memory.json 23 kamaji_dir = Path.home() / ".kamaji" 24 kamaji_dir.mkdir(exist_ok=True) 25 memory_file = kamaji_dir / "memory.json" 26 27 self.memory_file = memory_file 28 self.memories: List[Dict] = [] 29 self.load() 30 31 def load(self): 32 """Load memories from file.""" 33 if self.memory_file.exists(): 34 try: 35 with open(self.memory_file, 'r') as f: 36 data = json.load(f) 37 self.memories = data.get('memories', []) 38 except Exception as e: 39 print(f"Warning: Could not load memories: {e}") 40 self.memories = [] 41 else: 42 self.memories = [] 43 44 def save(self): 45 """Save memories to file.""" 46 try: 47 with open(self.memory_file, 'w') as f: 48 json.dump({ 49 'memories': self.memories, 50 'last_updated': datetime.now().isoformat() 51 }, f, indent=2) 52 except Exception as e: 53 print(f"Warning: Could not save memories: {e}") 54 55 def memorize(self, key: str, content: str, tags: Optional[List[str]] = None) -> Dict: 56 """ 57 Store a memory. 58 59 Args: 60 key: Short key/title for the memory 61 content: The content to remember 62 tags: Optional list of tags for categorization 63 64 Returns: 65 The created memory object 66 """ 67 # Check if key already exists and update it 68 existing = self.get_memory(key) 69 if existing: 70 existing['content'] = content 71 existing['tags'] = tags or [] 72 existing['updated_at'] = datetime.now().isoformat() 73 self.save() 74 return existing 75 76 # Create new memory 77 memory = { 78 'id': len(self.memories) + 1, 79 'key': key, 80 'content': content, 81 'tags': tags or [], 82 'created_at': datetime.now().isoformat(), 83 'updated_at': datetime.now().isoformat(), 84 'access_count': 0 85 } 86 self.memories.append(memory) 87 self.save() 88 return memory 89 90 def remember(self, query: str) -> List[Dict]: 91 """ 92 Recall memories matching the query. 93 94 Args: 95 query: Search query (matches key, content, or tags) 96 97 Returns: 98 List of matching memories, sorted by relevance 99 """ 100 query_lower = query.lower() 101 matches = [] 102 103 for memory in self.memories: 104 score = 0 105 106 # Exact key match (highest priority) 107 if memory['key'].lower() == query_lower: 108 score += 100 109 memory['access_count'] += 1 110 111 # Key contains query 112 elif query_lower in memory['key'].lower(): 113 score += 50 114 115 # Content contains query 116 elif query_lower in memory['content'].lower(): 117 score += 20 118 119 # Tag match 120 for tag in memory['tags']: 121 if query_lower in tag.lower(): 122 score += 30 123 break 124 125 if score > 0: 126 matches.append((score, memory)) 127 128 # Save updated access counts 129 if matches: 130 self.save() 131 132 # Sort by score (descending) and return memories 133 matches.sort(key=lambda x: x[0], reverse=True) 134 return [m[1] for m in matches] 135 136 def get_memory(self, key: str) -> Optional[Dict]: 137 """ 138 Get a specific memory by exact key match. 139 140 Args: 141 key: Memory key 142 143 Returns: 144 Memory object or None 145 """ 146 for memory in self.memories: 147 if memory['key'].lower() == key.lower(): 148 return memory 149 return None 150 151 def get_all_memories(self) -> List[Dict]: 152 """ 153 Get all memories. 154 155 Returns: 156 List of all memories, sorted by last updated 157 """ 158 sorted_memories = sorted( 159 self.memories, 160 key=lambda m: m['updated_at'], 161 reverse=True 162 ) 163 return sorted_memories 164 165 def delete_memory(self, key: str) -> bool: 166 """ 167 Delete a memory. 168 169 Args: 170 key: Memory key to delete 171 172 Returns: 173 True if deleted, False if not found 174 """ 175 for i, memory in enumerate(self.memories): 176 if memory['key'].lower() == key.lower(): 177 del self.memories[i] 178 self.save() 179 return True 180 return False 181 182 def clear_all(self): 183 """Clear all memories.""" 184 self.memories = [] 185 self.save() 186 187 def get_stats(self) -> Dict: 188 """ 189 Get memory statistics. 190 191 Returns: 192 Dictionary with stats 193 """ 194 if not self.memories: 195 return { 196 'total': 0, 197 'most_accessed': None, 198 'recently_added': None, 199 'total_tags': 0 200 } 201 202 # Most accessed 203 most_accessed = max(self.memories, key=lambda m: m['access_count']) 204 205 # Most recently added 206 recently_added = max(self.memories, key=lambda m: m['created_at']) 207 208 # Unique tags 209 all_tags = set() 210 for memory in self.memories: 211 all_tags.update(memory['tags']) 212 213 return { 214 'total': len(self.memories), 215 'most_accessed': most_accessed, 216 'recently_added': recently_added, 217 'total_tags': len(all_tags), 218 'all_tags': sorted(all_tags) 219 } 220 221 def format_memory_display(self) -> str: 222 """ 223 Format all memories for display. 224 225 Returns: 226 Formatted string 227 """ 228 if not self.memories: 229 return "š§ No memories stored yet.\n\nUse 'memorize <key> <content>' to store something!" 230 231 stats = self.get_stats() 232 233 lines = [] 234 lines.append("\nš§ Memory Bank") 235 lines.append("=" * 60) 236 lines.append(f"\nš Stats: {stats['total']} memories, {stats['total_tags']} tags") 237 238 if stats['all_tags']: 239 lines.append(f"š·ļø Tags: {', '.join(stats['all_tags'][:10])}") 240 if len(stats['all_tags']) > 10: 241 lines.append(f" ... and {len(stats['all_tags']) - 10} more") 242 243 lines.append("\nš Memories (most recent first):") 244 lines.append("-" * 60) 245 246 for memory in self.get_all_memories()[:20]: # Show last 20 247 # Format timestamp 248 updated = datetime.fromisoformat(memory['updated_at']) 249 time_str = updated.strftime('%Y-%m-%d %H:%M') 250 251 # Format tags 252 tags_str = f" [{', '.join(memory['tags'])}]" if memory['tags'] else "" 253 254 # Format content (truncate if long) 255 content = memory['content'] 256 if len(content) > 80: 257 content = content[:77] + "..." 258 259 lines.append(f"\nš§ {memory['key']}{tags_str}") 260 lines.append(f" {content}") 261 lines.append(f" š {time_str} | šļø Accessed {memory['access_count']} times") 262 263 if len(self.memories) > 20: 264 lines.append(f"\n... and {len(self.memories) - 20} more memories") 265 266 lines.append("\n" + "=" * 60) 267 lines.append("\nš” Commands:") 268 lines.append(" ⢠memorize <key> <content> - Store a memory") 269 lines.append(" ⢠remember <query> - Search memories") 270 lines.append(" ⢠memory - Show all memories (this view)") 271 272 return '\n'.join(lines) 273 274 275 def get_memory() -> Memory: 276 """ 277 Get the global memory instance. 278 279 Returns: 280 Memory instance 281 """ 282 if not hasattr(get_memory, '_instance'): 283 get_memory._instance = Memory() 284 return get_memory._instance