/ core / attention / graph_integration.py
graph_integration.py
  1  #!/usr/bin/env python3
  2  """
  3  Graph Integration Layer
  4  
  5  Connects the Sovereign_Estate graph (atoms, edges, aha moments) with the
  6  Sovereign_OS attention system and server.
  7  
  8  Data flows:
  9      stream_session.py → atoms.jsonl → GraphIntegration → Server endpoints
 10      historical_processor.py → edges.jsonl → GraphIntegration → Correlation
 11      edge_prediction.py → aha events → GraphIntegration → Alert system
 12  
 13  This module:
 14  1. Watches graph data files for changes
 15  2. Streams new atoms/edges to the attention system
 16  3. Exposes graph state via server endpoints
 17  4. Correlates graph events with other sources (Monologue, Comet, artifacts)
 18  """
 19  
 20  import json
 21  import os
 22  from dataclasses import dataclass, field
 23  from datetime import datetime, timedelta
 24  from pathlib import Path
 25  from typing import Dict, List, Optional, Set, Any, Callable, Generator
 26  from collections import defaultdict
 27  import threading
 28  import time
 29  
 30  from .signal_words import detect_signals, SignalDetection
 31  
 32  
 33  @dataclass
 34  class GraphAtom:
 35      """An atom from the semantic graph."""
 36      uuid: str
 37      content: str
 38      source_type: str  # 'user', 'assistant'
 39      session_id: str
 40      timestamp: datetime
 41      parent_uuid: Optional[str] = None
 42  
 43      # Tags
 44      visible_tags: List[str] = field(default_factory=list)
 45      meta_tags: Dict[str, Any] = field(default_factory=dict)
 46  
 47      # Edges
 48      edges: List[Dict[str, Any]] = field(default_factory=list)
 49  
 50      # Analysis
 51      signals: Optional[SignalDetection] = None
 52      weight: float = 1.0
 53  
 54      @property
 55      def altitude(self) -> str:
 56          """Get altitude from meta tags (philosophical/strategic/operational/general)."""
 57          return self.meta_tags.get('altitude', 'general')
 58  
 59      @property
 60      def resonance(self) -> float:
 61          """Get resonance score from meta tags."""
 62          return self.meta_tags.get('resonance') or 0.5
 63  
 64  
 65  @dataclass
 66  class GraphEdge:
 67      """An edge between atoms."""
 68      source: str
 69      target: str
 70      edge_type: str  # follows, relates_to, extends, contradicts, etc.
 71      strength: float
 72      formed_at: datetime
 73      formation_context: str = ""
 74  
 75  
 76  @dataclass
 77  class GraphState:
 78      """Current state of the semantic graph."""
 79      atom_count: int = 0
 80      edge_count: int = 0
 81      last_atom_time: Optional[datetime] = None
 82      last_edge_time: Optional[datetime] = None
 83  
 84      # Distributions
 85      atoms_by_session: Dict[str, int] = field(default_factory=dict)
 86      atoms_by_altitude: Dict[str, int] = field(default_factory=dict)
 87      edges_by_type: Dict[str, int] = field(default_factory=dict)
 88  
 89      # Recent high-value items
 90      high_resonance_atoms: List[str] = field(default_factory=list)
 91      gravity_wells: List[str] = field(default_factory=list)  # Topics with 10+ edges
 92  
 93  
 94  @dataclass
 95  class GraphEvent:
 96      """An event from the graph (new atom, edge, aha, etc.)."""
 97      event_type: str  # 'atom', 'edge', 'aha', 'gravity_well'
 98      timestamp: datetime
 99      data: Dict[str, Any]
100  
101  
102  class GraphIntegration:
103      """
104      Integrates the Sovereign_Estate semantic graph with Sovereign_OS.
105  
106      Provides:
107      - Real-time streaming of graph updates
108      - Query interface for graph state
109      - Correlation with other data sources
110      - Event subscription for new atoms/edges/ahas
111      """
112  
113      # Default paths (from Sovereign_Estate)
114      DEFAULT_GRAPH_DIR = Path.home() / 'repos/Sovereign_Estate/data/graph'
115      DEFAULT_ATOMS_DIR = DEFAULT_GRAPH_DIR / 'atoms'
116      DEFAULT_EDGES_DIR = DEFAULT_GRAPH_DIR / 'edges'
117  
118      def __init__(
119          self,
120          graph_dir: Optional[str] = None,
121          watch_interval: float = 5.0
122      ):
123          self.graph_dir = Path(graph_dir) if graph_dir else self.DEFAULT_GRAPH_DIR
124          self.atoms_dir = self.graph_dir / 'atoms'
125          self.edges_dir = self.graph_dir / 'edges'
126          self.watch_interval = watch_interval
127  
128          # State
129          self._atoms: Dict[str, GraphAtom] = {}
130          self._edges: List[GraphEdge] = []
131          self._state = GraphState()
132  
133          # File tracking
134          self._atom_file_positions: Dict[str, int] = {}
135          self._edge_file_positions: Dict[str, int] = {}
136  
137          # Event subscribers
138          self._subscribers: Dict[str, List[Callable]] = defaultdict(list)
139  
140          # Threading
141          self._watching = False
142          self._watch_thread: Optional[threading.Thread] = None
143  
144      def subscribe(self, event_type: str, callback: Callable[[GraphEvent], None]):
145          """Subscribe to graph events (atom, edge, aha, gravity_well)."""
146          self._subscribers[event_type].append(callback)
147  
148      def _emit(self, event_type: str, data: Dict[str, Any]):
149          """Emit an event to subscribers."""
150          event = GraphEvent(
151              event_type=event_type,
152              timestamp=datetime.now(),
153              data=data
154          )
155          for callback in self._subscribers.get(event_type, []):
156              try:
157                  callback(event)
158              except Exception as e:
159                  print(f"[GraphIntegration] Subscriber error: {e}")
160  
161          # Also emit to 'all' subscribers
162          for callback in self._subscribers.get('all', []):
163              try:
164                  callback(event)
165              except Exception as e:
166                  pass
167  
168      def load_today(self) -> GraphState:
169          """Load today's graph data."""
170          today = datetime.now().strftime('%Y-%m-%d')
171  
172          # Load atoms
173          atoms_file = self.atoms_dir / f'{today}-atoms.jsonl'
174          if atoms_file.exists():
175              self._load_atoms_file(atoms_file)
176  
177          # Load edges
178          edges_file = self.edges_dir / f'{today}-edges.jsonl'
179          if edges_file.exists():
180              self._load_edges_file(edges_file)
181  
182          self._update_state()
183          return self._state
184  
185      def load_range(self, days_back: int = 7) -> GraphState:
186          """Load graph data for a date range."""
187          for i in range(days_back):
188              date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
189  
190              atoms_file = self.atoms_dir / f'{date}-atoms.jsonl'
191              if atoms_file.exists():
192                  self._load_atoms_file(atoms_file)
193  
194              edges_file = self.edges_dir / f'{date}-edges.jsonl'
195              if edges_file.exists():
196                  self._load_edges_file(edges_file)
197  
198          self._update_state()
199          return self._state
200  
201      def _load_atoms_file(self, filepath: Path, from_position: int = 0) -> int:
202          """Load atoms from a JSONL file, starting from position."""
203          new_atoms = 0
204  
205          try:
206              with open(filepath, 'r') as f:
207                  if from_position > 0:
208                      f.seek(from_position)
209  
210                  for line in f:
211                      line = line.strip()
212                      if not line:
213                          continue
214  
215                      try:
216                          data = json.loads(line)
217                          atom = self._parse_atom(data)
218                          if atom and atom.uuid not in self._atoms:
219                              self._atoms[atom.uuid] = atom
220                              new_atoms += 1
221  
222                              # Emit event
223                              self._emit('atom', {
224                                  'uuid': atom.uuid,
225                                  'content': atom.content[:200],
226                                  'source_type': atom.source_type,
227                                  'altitude': atom.altitude,
228                                  'resonance': atom.resonance,
229                                  'tags': atom.visible_tags
230                              })
231  
232                      except json.JSONDecodeError:
233                          continue
234  
235                  # Track position
236                  self._atom_file_positions[str(filepath)] = f.tell()
237  
238          except Exception as e:
239              print(f"[GraphIntegration] Error loading atoms: {e}")
240  
241          return new_atoms
242  
243      def _load_edges_file(self, filepath: Path, from_position: int = 0) -> int:
244          """Load edges from a JSONL file."""
245          new_edges = 0
246  
247          try:
248              with open(filepath, 'r') as f:
249                  if from_position > 0:
250                      f.seek(from_position)
251  
252                  for line in f:
253                      line = line.strip()
254                      if not line:
255                          continue
256  
257                      try:
258                          data = json.loads(line)
259                          edge = self._parse_edge(data)
260                          if edge:
261                              self._edges.append(edge)
262                              new_edges += 1
263  
264                              # Emit event
265                              self._emit('edge', {
266                                  'source': edge.source,
267                                  'target': edge.target,
268                                  'type': edge.edge_type,
269                                  'strength': edge.strength
270                              })
271  
272                      except json.JSONDecodeError:
273                          continue
274  
275                  self._edge_file_positions[str(filepath)] = f.tell()
276  
277          except Exception as e:
278              print(f"[GraphIntegration] Error loading edges: {e}")
279  
280          return new_edges
281  
282      def _parse_atom(self, data: Dict[str, Any]) -> Optional[GraphAtom]:
283          """Parse atom from JSON data."""
284          try:
285              # Parse timestamp
286              ts_str = data.get('timestamp', '')
287              try:
288                  timestamp = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
289                  timestamp = timestamp.replace(tzinfo=None)
290              except:
291                  timestamp = datetime.now()
292  
293              atom = GraphAtom(
294                  uuid=data.get('uuid', ''),
295                  content=data.get('content', ''),
296                  source_type=data.get('source_type', 'unknown'),
297                  session_id=data.get('session_id', ''),
298                  timestamp=timestamp,
299                  parent_uuid=data.get('parent_uuid'),
300                  visible_tags=data.get('visible_tags', []),
301                  meta_tags=data.get('meta_tags', {}),
302                  edges=data.get('edges', [])
303              )
304  
305              # Analyze signals
306              atom.signals = detect_signals(atom.content)
307              atom.weight = atom.signals.weight_modifier * (1 + atom.resonance)
308  
309              return atom
310  
311          except Exception as e:
312              return None
313  
314      def _parse_edge(self, data: Dict[str, Any]) -> Optional[GraphEdge]:
315          """Parse edge from JSON data."""
316          try:
317              ts_str = data.get('formed_at', '')
318              try:
319                  timestamp = datetime.fromisoformat(ts_str)
320              except:
321                  timestamp = datetime.now()
322  
323              return GraphEdge(
324                  source=data.get('source', ''),
325                  target=data.get('target', ''),
326                  edge_type=data.get('edge_type', 'relates_to'),
327                  strength=data.get('strength', 0.5),
328                  formed_at=timestamp,
329                  formation_context=data.get('formation_context', '')
330              )
331          except:
332              return None
333  
334      def _update_state(self):
335          """Update graph state statistics."""
336          self._state.atom_count = len(self._atoms)
337          self._state.edge_count = len(self._edges)
338  
339          # Reset distributions
340          self._state.atoms_by_session = defaultdict(int)
341          self._state.atoms_by_altitude = defaultdict(int)
342          self._state.edges_by_type = defaultdict(int)
343  
344          # Compute distributions
345          for atom in self._atoms.values():
346              self._state.atoms_by_session[atom.session_id[:8]] += 1
347              self._state.atoms_by_altitude[atom.altitude] += 1
348              if atom.timestamp and (not self._state.last_atom_time or
349                                     atom.timestamp > self._state.last_atom_time):
350                  self._state.last_atom_time = atom.timestamp
351  
352          for edge in self._edges:
353              self._state.edges_by_type[edge.edge_type] += 1
354              if edge.formed_at and (not self._state.last_edge_time or
355                                     edge.formed_at > self._state.last_edge_time):
356                  self._state.last_edge_time = edge.formed_at
357  
358          # Find high resonance atoms
359          sorted_atoms = sorted(
360              self._atoms.values(),
361              key=lambda a: a.resonance,
362              reverse=True
363          )
364          self._state.high_resonance_atoms = [a.uuid for a in sorted_atoms[:20]]
365  
366          # Find gravity wells (topics with many edges)
367          edge_counts = defaultdict(int)
368          for edge in self._edges:
369              edge_counts[edge.source] += 1
370              edge_counts[edge.target] += 1
371  
372          self._state.gravity_wells = [
373              uuid for uuid, count in edge_counts.items()
374              if count >= 10
375          ]
376  
377      # ==================== Query Interface ====================
378  
379      def get_state(self) -> Dict[str, Any]:
380          """Get current graph state."""
381          return {
382              'atom_count': self._state.atom_count,
383              'edge_count': self._state.edge_count,
384              'last_atom_time': self._state.last_atom_time.isoformat() if self._state.last_atom_time else None,
385              'last_edge_time': self._state.last_edge_time.isoformat() if self._state.last_edge_time else None,
386              'atoms_by_altitude': dict(self._state.atoms_by_altitude),
387              'edges_by_type': dict(self._state.edges_by_type),
388              'gravity_well_count': len(self._state.gravity_wells),
389              'high_resonance_count': len(self._state.high_resonance_atoms)
390          }
391  
392      def get_atom(self, uuid: str) -> Optional[Dict[str, Any]]:
393          """Get a specific atom by UUID."""
394          atom = self._atoms.get(uuid)
395          if not atom:
396              return None
397  
398          return {
399              'uuid': atom.uuid,
400              'content': atom.content,
401              'source_type': atom.source_type,
402              'session_id': atom.session_id,
403              'timestamp': atom.timestamp.isoformat(),
404              'altitude': atom.altitude,
405              'resonance': atom.resonance,
406              'weight': atom.weight,
407              'visible_tags': atom.visible_tags,
408              'edge_count': len(atom.edges)
409          }
410  
411      def get_recent_atoms(
412          self,
413          limit: int = 50,
414          source_type: Optional[str] = None,
415          min_weight: float = 0.0
416      ) -> List[Dict[str, Any]]:
417          """Get recent atoms, optionally filtered."""
418          atoms = sorted(
419              self._atoms.values(),
420              key=lambda a: a.timestamp,
421              reverse=True
422          )
423  
424          if source_type:
425              atoms = [a for a in atoms if a.source_type == source_type]
426  
427          if min_weight > 0:
428              atoms = [a for a in atoms if a.weight >= min_weight]
429  
430          return [
431              {
432                  'uuid': a.uuid,
433                  'content': a.content[:300],
434                  'source_type': a.source_type,
435                  'timestamp': a.timestamp.isoformat(),
436                  'altitude': a.altitude,
437                  'resonance': a.resonance,
438                  'weight': a.weight,
439                  'tags': a.visible_tags
440              }
441              for a in atoms[:limit]
442          ]
443  
444      def get_high_resonance_atoms(self, limit: int = 20) -> List[Dict[str, Any]]:
445          """Get atoms with highest resonance scores."""
446          atoms = sorted(
447              self._atoms.values(),
448              key=lambda a: a.resonance,
449              reverse=True
450          )
451  
452          return [
453              {
454                  'uuid': a.uuid,
455                  'content': a.content[:300],
456                  'resonance': a.resonance,
457                  'altitude': a.altitude,
458                  'tags': a.visible_tags
459              }
460              for a in atoms[:limit]
461          ]
462  
463      def get_gravity_wells(self) -> List[Dict[str, Any]]:
464          """Get gravity wells (highly connected nodes)."""
465          wells = []
466  
467          for uuid in self._state.gravity_wells:
468              atom = self._atoms.get(uuid)
469              if atom:
470                  # Count incoming/outgoing edges
471                  incoming = sum(1 for e in self._edges if e.target == uuid)
472                  outgoing = sum(1 for e in self._edges if e.source == uuid)
473  
474                  wells.append({
475                      'uuid': uuid,
476                      'content': atom.content[:200],
477                      'incoming_edges': incoming,
478                      'outgoing_edges': outgoing,
479                      'total_edges': incoming + outgoing,
480                      'altitude': atom.altitude
481                  })
482  
483          return sorted(wells, key=lambda w: -w['total_edges'])
484  
485      def search_atoms(self, query: str, limit: int = 20) -> List[Dict[str, Any]]:
486          """Simple text search across atoms."""
487          query_lower = query.lower()
488          matches = []
489  
490          for atom in self._atoms.values():
491              if query_lower in atom.content.lower():
492                  matches.append(atom)
493  
494          # Sort by relevance (basic: prefer shorter content with match)
495          matches.sort(key=lambda a: (
496              -a.weight,
497              len(a.content)
498          ))
499  
500          return [
501              {
502                  'uuid': a.uuid,
503                  'content': a.content[:300],
504                  'weight': a.weight,
505                  'timestamp': a.timestamp.isoformat()
506              }
507              for a in matches[:limit]
508          ]
509  
510      # ==================== Streaming ====================
511  
512      def start_watching(self):
513          """Start watching for graph updates in background."""
514          if self._watching:
515              return
516  
517          self._watching = True
518          self._watch_thread = threading.Thread(target=self._watch_loop, daemon=True)
519          self._watch_thread.start()
520          print("[GraphIntegration] Started watching graph files")
521  
522      def stop_watching(self):
523          """Stop watching for updates."""
524          self._watching = False
525          if self._watch_thread:
526              self._watch_thread.join(timeout=2)
527          print("[GraphIntegration] Stopped watching")
528  
529      def _watch_loop(self):
530          """Background loop to watch for file changes."""
531          while self._watching:
532              try:
533                  today = datetime.now().strftime('%Y-%m-%d')
534  
535                  # Check atoms file
536                  atoms_file = self.atoms_dir / f'{today}-atoms.jsonl'
537                  if atoms_file.exists():
538                      pos = self._atom_file_positions.get(str(atoms_file), 0)
539                      current_size = atoms_file.stat().st_size
540                      if current_size > pos:
541                          new = self._load_atoms_file(atoms_file, pos)
542                          if new > 0:
543                              self._update_state()
544  
545                  # Check edges file
546                  edges_file = self.edges_dir / f'{today}-edges.jsonl'
547                  if edges_file.exists():
548                      pos = self._edge_file_positions.get(str(edges_file), 0)
549                      current_size = edges_file.stat().st_size
550                      if current_size > pos:
551                          new = self._load_edges_file(edges_file, pos)
552                          if new > 0:
553                              self._update_state()
554  
555                              # Check for new gravity wells
556                              self._check_gravity_wells()
557  
558              except Exception as e:
559                  print(f"[GraphIntegration] Watch error: {e}")
560  
561              time.sleep(self.watch_interval)
562  
563      def _check_gravity_wells(self):
564          """Check for new gravity wells and emit events."""
565          edge_counts = defaultdict(int)
566          for edge in self._edges:
567              edge_counts[edge.source] += 1
568              edge_counts[edge.target] += 1
569  
570          for uuid, count in edge_counts.items():
571              if count >= 10 and uuid not in self._state.gravity_wells:
572                  self._state.gravity_wells.append(uuid)
573                  atom = self._atoms.get(uuid)
574                  if atom:
575                      self._emit('gravity_well', {
576                          'uuid': uuid,
577                          'content': atom.content[:200],
578                          'edge_count': count
579                      })
580  
581      def stream_atoms(
582          self,
583          since: Optional[datetime] = None
584      ) -> Generator[Dict[str, Any], None, None]:
585          """Generator that yields atoms, starting from a timestamp."""
586          atoms = sorted(self._atoms.values(), key=lambda a: a.timestamp)
587  
588          for atom in atoms:
589              if since and atom.timestamp <= since:
590                  continue
591  
592              yield {
593                  'uuid': atom.uuid,
594                  'content': atom.content,
595                  'source_type': atom.source_type,
596                  'timestamp': atom.timestamp.isoformat(),
597                  'altitude': atom.altitude,
598                  'resonance': atom.resonance,
599                  'tags': atom.visible_tags
600              }
601  
602  
603  # Singleton instance
604  _graph_integration: Optional[GraphIntegration] = None
605  
606  
607  def get_graph_integration() -> GraphIntegration:
608      """Get or create the graph integration singleton."""
609      global _graph_integration
610      if _graph_integration is None:
611          _graph_integration = GraphIntegration()
612          _graph_integration.load_today()
613          _graph_integration.start_watching()
614      return _graph_integration
615  
616  
617  if __name__ == '__main__':
618      print("=== Graph Integration ===\n")
619  
620      gi = GraphIntegration()
621      state = gi.load_range(days_back=1)
622  
623      print(f"Atoms: {state.atom_count}")
624      print(f"Edges: {state.edge_count}")
625      print(f"Gravity wells: {len(state.gravity_wells)}")
626      print(f"\nAtoms by altitude:")
627      for alt, count in state.atoms_by_altitude.items():
628          print(f"  {alt}: {count}")
629      print(f"\nEdges by type:")
630      for etype, count in state.edges_by_type.items():
631          print(f"  {etype}: {count}")
632  
633      print("\nHigh resonance atoms:")
634      for atom in gi.get_high_resonance_atoms(5):
635          print(f"  [{atom['resonance']:.2f}] {atom['content'][:60]}...")