/ core / flight / protocol.py
protocol.py
  1  """
  2  Flight Protocol
  3  
  4  The cognitive flight cycle for knowledge work:
  5  
  6      FLY HIGH → RETAIN → LAND → BIRTH → SAFETY → repeat
  7  
  8  Each phase has different characteristics:
  9  - Energy level expectations
 10  - Appropriate cognitive activities
 11  - System behavior adaptations
 12  """
 13  
 14  from dataclasses import dataclass, field
 15  from datetime import datetime, time, timedelta
 16  from enum import Enum
 17  from typing import Optional, List, Dict, Any, Callable
 18  
 19  
 20  class FlightPhase(Enum):
 21      """
 22      The five phases of the flight protocol.
 23      """
 24      FLY_HIGH = "fly_high"      # Generate, explore, diverge
 25      RETAIN = "retain"          # Mark what resonates, predict edges
 26      LAND = "land"              # Ground in specifics, commit
 27      BIRTH = "birth"            # Compaction, synthesis, new nodes
 28      SAFETY = "safety"          # Backup, validation, prepare
 29  
 30      @property
 31      def description(self) -> str:
 32          descriptions = {
 33              FlightPhase.FLY_HIGH: "Generate, explore, diverge - high energy creative phase",
 34              FlightPhase.RETAIN: "Mark what resonates, predict edges - selection phase",
 35              FlightPhase.LAND: "Ground in specifics, commit to bullets - commitment phase",
 36              FlightPhase.BIRTH: "Compaction, synthesis - new nodes emerge",
 37              FlightPhase.SAFETY: "Backup, validation, prepare for next cycle",
 38          }
 39          return descriptions.get(self, "")
 40  
 41  
 42  @dataclass
 43  class PhaseCharacteristics:
 44      """
 45      Characteristics and expectations for a flight phase.
 46      """
 47      phase: FlightPhase
 48      typical_duration_hours: float
 49      optimal_time_of_day: Optional[time]
 50      energy_level: str               # 'high', 'medium', 'low'
 51      cognitive_mode: str             # 'divergent', 'convergent', 'analytical', 'restful'
 52      system_behavior: str            # How system should adapt
 53      allowed_transitions: List[FlightPhase]
 54  
 55  
 56  # Default phase characteristics (customizable per operator)
 57  DEFAULT_PHASE_CHARACTERISTICS = {
 58      FlightPhase.FLY_HIGH: PhaseCharacteristics(
 59          phase=FlightPhase.FLY_HIGH,
 60          typical_duration_hours=3.0,
 61          optimal_time_of_day=time(9, 0),  # Morning
 62          energy_level='high',
 63          cognitive_mode='divergent',
 64          system_behavior='Surface diverse connections, enable exploration, minimize constraints',
 65          allowed_transitions=[FlightPhase.RETAIN, FlightPhase.LAND]
 66      ),
 67      FlightPhase.RETAIN: PhaseCharacteristics(
 68          phase=FlightPhase.RETAIN,
 69          typical_duration_hours=2.0,
 70          optimal_time_of_day=time(12, 0),  # Midday
 71          energy_level='medium',
 72          cognitive_mode='convergent',
 73          system_behavior='Highlight high-resonance items, suggest connections, enable marking',
 74          allowed_transitions=[FlightPhase.FLY_HIGH, FlightPhase.LAND]
 75      ),
 76      FlightPhase.LAND: PhaseCharacteristics(
 77          phase=FlightPhase.LAND,
 78          typical_duration_hours=2.0,
 79          optimal_time_of_day=time(15, 0),  # Afternoon
 80          energy_level='medium',
 81          cognitive_mode='analytical',
 82          system_behavior='Focus on specific items, enable commitment, surface dependencies',
 83          allowed_transitions=[FlightPhase.RETAIN, FlightPhase.BIRTH, FlightPhase.SAFETY]
 84      ),
 85      FlightPhase.BIRTH: PhaseCharacteristics(
 86          phase=FlightPhase.BIRTH,
 87          typical_duration_hours=1.0,
 88          optimal_time_of_day=time(22, 0),  # Evening
 89          energy_level='low',
 90          cognitive_mode='restful',
 91          system_behavior='Run compaction, synthesize, create new nodes automatically',
 92          allowed_transitions=[FlightPhase.SAFETY]
 93      ),
 94      FlightPhase.SAFETY: PhaseCharacteristics(
 95          phase=FlightPhase.SAFETY,
 96          typical_duration_hours=1.0,
 97          optimal_time_of_day=time(23, 0),  # Night
 98          energy_level='low',
 99          cognitive_mode='restful',
100          system_behavior='Backup, validate integrity, prepare daily note for tomorrow',
101          allowed_transitions=[FlightPhase.FLY_HIGH]
102      ),
103  }
104  
105  
106  @dataclass
107  class PhaseTransition:
108      """
109      Records a transition between flight phases.
110      """
111      from_phase: FlightPhase
112      to_phase: FlightPhase
113      timestamp: datetime
114      trigger: str                    # 'time', 'manual', 'energy', 'completion'
115      context: Optional[Dict[str, Any]] = None
116  
117  
118  @dataclass
119  class FlightState:
120      """
121      Current state in the flight protocol.
122      """
123      operator_id: str
124      current_phase: FlightPhase
125      phase_started_at: datetime
126      transition_history: List[PhaseTransition] = field(default_factory=list)
127      phase_metrics: Dict[FlightPhase, Dict[str, Any]] = field(default_factory=dict)
128  
129      def time_in_phase(self) -> timedelta:
130          """How long in current phase."""
131          return datetime.now() - self.phase_started_at
132  
133      def should_transition(self, characteristics: Optional[PhaseCharacteristics] = None) -> bool:
134          """Check if it's time to transition based on duration."""
135          chars = characteristics or DEFAULT_PHASE_CHARACTERISTICS.get(self.current_phase)
136          if not chars:
137              return False
138  
139          max_duration = timedelta(hours=chars.typical_duration_hours * 1.5)
140          return self.time_in_phase() > max_duration
141  
142  
143  class FlightProtocol:
144      """
145      Manages the flight protocol for an operator.
146  
147      Tracks phase transitions, suggests optimal timing,
148      and adapts system behavior to current phase.
149      """
150  
151      def __init__(
152          self,
153          operator_id: str,
154          phase_characteristics: Optional[Dict[FlightPhase, PhaseCharacteristics]] = None,
155          on_phase_change: Optional[Callable[[PhaseTransition], None]] = None
156      ):
157          """
158          Initialize flight protocol.
159  
160          Args:
161              operator_id: Identifier for the operator
162              phase_characteristics: Optional custom characteristics
163              on_phase_change: Optional callback for phase transitions
164          """
165          self.operator_id = operator_id
166          self.characteristics = phase_characteristics or DEFAULT_PHASE_CHARACTERISTICS.copy()
167          self.on_phase_change = on_phase_change
168  
169          # Initialize state
170          self.state = FlightState(
171              operator_id=operator_id,
172              current_phase=self._infer_initial_phase(),
173              phase_started_at=datetime.now()
174          )
175  
176      def _infer_initial_phase(self) -> FlightPhase:
177          """Infer initial phase based on current time."""
178          now = datetime.now().time()
179  
180          # Find phase whose optimal time is closest
181          best_phase = FlightPhase.FLY_HIGH
182          min_diff = timedelta(hours=24)
183  
184          for phase, chars in self.characteristics.items():
185              if chars.optimal_time_of_day:
186                  # Calculate time difference
187                  optimal_dt = datetime.combine(datetime.today(), chars.optimal_time_of_day)
188                  now_dt = datetime.combine(datetime.today(), now)
189                  diff = abs(now_dt - optimal_dt)
190  
191                  if diff < min_diff:
192                      min_diff = diff
193                      best_phase = phase
194  
195          return best_phase
196  
197      def transition_to(
198          self,
199          new_phase: FlightPhase,
200          trigger: str = 'manual',
201          context: Optional[Dict[str, Any]] = None
202      ) -> bool:
203          """
204          Transition to a new phase.
205  
206          Args:
207              new_phase: Target phase
208              trigger: What triggered the transition
209              context: Optional context data
210  
211          Returns:
212              True if transition succeeded
213          """
214          # Check if transition is allowed
215          current_chars = self.characteristics.get(self.state.current_phase)
216          if current_chars and new_phase not in current_chars.allowed_transitions:
217              return False
218  
219          # Record transition
220          transition = PhaseTransition(
221              from_phase=self.state.current_phase,
222              to_phase=new_phase,
223              timestamp=datetime.now(),
224              trigger=trigger,
225              context=context
226          )
227          self.state.transition_history.append(transition)
228  
229          # Record metrics for current phase
230          if self.state.current_phase not in self.state.phase_metrics:
231              self.state.phase_metrics[self.state.current_phase] = {}
232  
233          self.state.phase_metrics[self.state.current_phase]['last_duration'] = \
234              self.state.time_in_phase()
235  
236          # Update state
237          self.state.current_phase = new_phase
238          self.state.phase_started_at = datetime.now()
239  
240          # Call callback if registered
241          if self.on_phase_change:
242              self.on_phase_change(transition)
243  
244          return True
245  
246      def suggest_next_phase(self) -> Optional[FlightPhase]:
247          """
248          Suggest the next phase based on time and state.
249  
250          Returns:
251              Suggested next phase, or None if current is fine
252          """
253          current_chars = self.characteristics.get(self.state.current_phase)
254          if not current_chars:
255              return None
256  
257          # Check if we've been in phase too long
258          if self.state.should_transition(current_chars):
259              # Suggest first allowed transition
260              if current_chars.allowed_transitions:
261                  return current_chars.allowed_transitions[0]
262  
263          # Check if another phase's optimal time is now
264          now = datetime.now().time()
265          for phase in current_chars.allowed_transitions:
266              phase_chars = self.characteristics.get(phase)
267              if phase_chars and phase_chars.optimal_time_of_day:
268                  optimal = phase_chars.optimal_time_of_day
269                  # If within 30 mins of optimal time
270                  optimal_dt = datetime.combine(datetime.today(), optimal)
271                  now_dt = datetime.combine(datetime.today(), now)
272                  if abs(optimal_dt - now_dt) < timedelta(minutes=30):
273                      return phase
274  
275          return None
276  
277      def get_current_behavior_guidance(self) -> Dict[str, Any]:
278          """
279          Get guidance for system behavior in current phase.
280  
281          Returns:
282              Dict with behavior guidance
283          """
284          chars = self.characteristics.get(self.state.current_phase)
285          if not chars:
286              return {}
287  
288          return {
289              'phase': self.state.current_phase.value,
290              'phase_description': self.state.current_phase.description,
291              'cognitive_mode': chars.cognitive_mode,
292              'energy_level': chars.energy_level,
293              'system_behavior': chars.system_behavior,
294              'time_in_phase_minutes': int(self.state.time_in_phase().total_seconds() / 60),
295              'typical_duration_hours': chars.typical_duration_hours,
296              'allowed_transitions': [t.value for t in chars.allowed_transitions],
297          }
298  
299      def get_phase_summary(self) -> Dict[str, Any]:
300          """Get summary of all phases and transitions."""
301          return {
302              'operator_id': self.operator_id,
303              'current_phase': self.state.current_phase.value,
304              'phase_started_at': self.state.phase_started_at.isoformat(),
305              'time_in_phase_minutes': int(self.state.time_in_phase().total_seconds() / 60),
306              'total_transitions': len(self.state.transition_history),
307              'phase_metrics': {
308                  p.value: m for p, m in self.state.phase_metrics.items()
309              },
310              'recent_transitions': [
311                  {
312                      'from': t.from_phase.value,
313                      'to': t.to_phase.value,
314                      'trigger': t.trigger,
315                      'timestamp': t.timestamp.isoformat()
316                  }
317                  for t in self.state.transition_history[-5:]
318              ]
319          }
320  
321  
322  # Quick test
323  if __name__ == "__main__":
324      print("=== Flight Protocol Test ===\n")
325  
326      # Track transitions
327      transitions = []
328      def on_change(t: PhaseTransition):
329          transitions.append(t)
330          print(f"  Transition: {t.from_phase.value} → {t.to_phase.value} (trigger: {t.trigger})")
331  
332      # Create protocol
333      protocol = FlightProtocol("rick", on_phase_change=on_change)
334  
335      print(f"Initial phase: {protocol.state.current_phase.value}")
336      print(f"Inferred from time of day\n")
337  
338      # Get behavior guidance
339      guidance = protocol.get_current_behavior_guidance()
340      print("Current behavior guidance:")
341      for key, value in guidance.items():
342          print(f"  {key}: {value}")
343      print()
344  
345      # Manual transitions through cycle
346      print("Manual transition through cycle:")
347  
348      # Start from FLY_HIGH for demo
349      protocol.state.current_phase = FlightPhase.FLY_HIGH
350      protocol.state.phase_started_at = datetime.now()
351  
352      for next_phase in [FlightPhase.RETAIN, FlightPhase.LAND, FlightPhase.BIRTH, FlightPhase.SAFETY, FlightPhase.FLY_HIGH]:
353          result = protocol.transition_to(next_phase, trigger='test')
354          if not result:
355              print(f"  Failed to transition to {next_phase.value}")
356  
357      print()
358  
359      # Summary
360      summary = protocol.get_phase_summary()
361      print("Protocol summary:")
362      print(f"  Current phase: {summary['current_phase']}")
363      print(f"  Total transitions: {summary['total_transitions']}")
364  
365      print("\n=== Test Complete ===")