/ src / lightspeed / agent / session.gleam
session.gleam
  1  //// Minimal typed session actor model for Phase 2.
  2  
  3  import gleam/int
  4  import gleam/list
  5  import gleam/option.{type Option, None, Some}
  6  import lightspeed/agent/typestate
  7  import lightspeed/diff
  8  
  9  /// Reconnect strategy after disconnect/crash.
 10  pub type ReconnectPolicy {
 11    Rehydrate
 12    Remount
 13  }
 14  
 15  /// Typed inbox events handled by one live-session actor.
 16  pub type InboxEvent {
 17    Connect(route: String, csrf_token: String, now_ms: Int)
 18    Reconnect(route: String, now_ms: Int)
 19    Increment
 20    Decrement
 21    Ack(ref: String)
 22    Heartbeat(now_ms: Int)
 23    Tick(now_ms: Int)
 24    Crash(reason: String)
 25    Restart(now_ms: Int)
 26    Shutdown(reason: String)
 27  }
 28  
 29  /// Typed inbox envelope that identifies the sender process id.
 30  pub type InboxMessage {
 31    InboxMessage(owner: String, event: InboxEvent)
 32  }
 33  
 34  /// Patch queued by the session actor.
 35  pub type PatchEnvelope {
 36    PatchEnvelope(ref: String, patch: diff.Patch)
 37  }
 38  
 39  /// Telemetry emitted by one session actor.
 40  pub type TelemetryEvent {
 41    SessionMounted(session_id: String, route: String)
 42    SessionRehydrated(session_id: String, route: String)
 43    SessionRemounted(session_id: String, route: String)
 44    CounterUpdated(session_id: String, value: Int)
 45    PatchQueued(session_id: String, ref: String)
 46    PatchAcked(session_id: String, ref: String)
 47    HeartbeatReceived(session_id: String, deadline_ms: Int)
 48    HeartbeatTimedOut(session_id: String, now_ms: Int)
 49    SessionCrashed(session_id: String, reason: String)
 50    SessionRestarted(session_id: String)
 51    SessionShutdown(session_id: String, reason: String)
 52    OwnershipRejected(session_id: String, owner: String)
 53    EventIgnored(session_id: String, event: String)
 54  }
 55  
 56  /// Typed outbox records emitted from the actor.
 57  pub type OutboxMessage {
 58    OutboxPatch(PatchEnvelope)
 59    OutboxTelemetry(TelemetryEvent)
 60  }
 61  
 62  /// Internal state of one session actor.
 63  pub opaque type Session {
 64    Session(
 65      id: String,
 66      owner: String,
 67      lifecycle: typestate.Lifecycle,
 68      reconnect_policy: ReconnectPolicy,
 69      mounted_route: Option(String),
 70      counter: Int,
 71      next_patch_ref: Int,
 72      pending_rev: List(PatchEnvelope),
 73      outbox_rev: List(OutboxMessage),
 74      telemetry_rev: List(TelemetryEvent),
 75      counter_static_sent: Bool,
 76      timeout_ms: Int,
 77      heartbeat_deadline_ms: Int,
 78      crashed: Bool,
 79    )
 80  }
 81  
 82  /// Start a new disconnected session actor state.
 83  pub fn start(
 84    id: String,
 85    owner: String,
 86    reconnect_policy: ReconnectPolicy,
 87    now_ms: Int,
 88    timeout_ms: Int,
 89  ) -> Session {
 90    Session(
 91      id: id,
 92      owner: owner,
 93      lifecycle: typestate.DisconnectedLabel,
 94      reconnect_policy: reconnect_policy,
 95      mounted_route: None,
 96      counter: 0,
 97      next_patch_ref: 1,
 98      pending_rev: [],
 99      outbox_rev: [],
100      telemetry_rev: [],
101      counter_static_sent: False,
102      timeout_ms: timeout_ms,
103      heartbeat_deadline_ms: now_ms + timeout_ms,
104      crashed: False,
105    )
106  }
107  
108  /// Handle one typed inbox message.
109  pub fn handle(session: Session, message: InboxMessage) -> Session {
110    case message {
111      InboxMessage(owner, event) ->
112        case owner == session.owner {
113          True -> apply_event(session, event)
114          False -> emit_telemetry(session, OwnershipRejected(session.id, owner))
115        }
116    }
117  }
118  
119  /// Session id.
120  pub fn id(session: Session) -> String {
121    session.id
122  }
123  
124  /// Owning server process id.
125  pub fn owner(session: Session) -> String {
126    session.owner
127  }
128  
129  /// Lifecycle label.
130  pub fn lifecycle(session: Session) -> typestate.Lifecycle {
131    session.lifecycle
132  }
133  
134  /// Current counter model used by this minimal actor.
135  pub fn counter(session: Session) -> Int {
136    session.counter
137  }
138  
139  /// Current reconnect policy.
140  pub fn reconnect_policy(session: Session) -> ReconnectPolicy {
141    session.reconnect_policy
142  }
143  
144  /// Pending patches waiting for ack, in emit order.
145  pub fn pending_patches(session: Session) -> List(PatchEnvelope) {
146    list.reverse(session.pending_rev)
147  }
148  
149  /// Telemetry records in emit order.
150  pub fn telemetry(session: Session) -> List(TelemetryEvent) {
151    list.reverse(session.telemetry_rev)
152  }
153  
154  /// Heartbeat deadline timestamp in milliseconds.
155  pub fn heartbeat_deadline_ms(session: Session) -> Int {
156    session.heartbeat_deadline_ms
157  }
158  
159  /// True when session has crashed and is waiting for restart.
160  pub fn crashed(session: Session) -> Bool {
161    session.crashed
162  }
163  
164  /// Drain the outbox and clear it from state.
165  pub fn flush_outbox(session: Session) -> #(Session, List(OutboxMessage)) {
166    let outbox = list.reverse(session.outbox_rev)
167    #(Session(..session, outbox_rev: []), outbox)
168  }
169  
170  /// Extract patch reference.
171  pub fn patch_ref(patch: PatchEnvelope) -> String {
172    patch.ref
173  }
174  
175  /// Extract patch payload.
176  pub fn patch(patch: PatchEnvelope) -> diff.Patch {
177    patch.patch
178  }
179  
180  /// Stable telemetry label for assertions and logs.
181  pub fn telemetry_label(event: TelemetryEvent) -> String {
182    case event {
183      SessionMounted(_, _) -> "session_mounted"
184      SessionRehydrated(_, _) -> "session_rehydrated"
185      SessionRemounted(_, _) -> "session_remounted"
186      CounterUpdated(_, _) -> "counter_updated"
187      PatchQueued(_, _) -> "patch_queued"
188      PatchAcked(_, _) -> "patch_acked"
189      HeartbeatReceived(_, _) -> "heartbeat_received"
190      HeartbeatTimedOut(_, _) -> "heartbeat_timed_out"
191      SessionCrashed(_, _) -> "session_crashed"
192      SessionRestarted(_) -> "session_restarted"
193      SessionShutdown(_, _) -> "session_shutdown"
194      OwnershipRejected(_, _) -> "ownership_rejected"
195      EventIgnored(_, _) -> "event_ignored"
196    }
197  }
198  
199  fn apply_event(session: Session, event: InboxEvent) -> Session {
200    case event {
201      Connect(route, _, now_ms) ->
202        connect_live(
203          session,
204          route,
205          now_ms,
206          SessionMounted(session.id, route),
207          keep_counter: True,
208        )
209  
210      Reconnect(route, now_ms) ->
211        case session.reconnect_policy {
212          Rehydrate ->
213            connect_live(
214              session,
215              route,
216              now_ms,
217              SessionRehydrated(session.id, route),
218              keep_counter: True,
219            )
220          Remount ->
221            connect_live(
222              session,
223              route,
224              now_ms,
225              SessionRemounted(session.id, route),
226              keep_counter: False,
227            )
228        }
229  
230      Increment -> apply_counter_delta(session, 1)
231  
232      Decrement -> apply_counter_delta(session, -1)
233  
234      Ack(ref) -> apply_ack(session, ref)
235  
236      Heartbeat(now_ms) ->
237        case session.lifecycle {
238          typestate.LiveLabel -> {
239            let deadline = now_ms + session.timeout_ms
240            Session(..session, heartbeat_deadline_ms: deadline)
241            |> emit_telemetry(HeartbeatReceived(session.id, deadline))
242          }
243          _ -> ignore_event(session, "heartbeat")
244        }
245  
246      Tick(now_ms) ->
247        case session.lifecycle {
248          typestate.LiveLabel ->
249            case now_ms > session.heartbeat_deadline_ms {
250              True ->
251                Session(..session, lifecycle: typestate.DrainingLabel)
252                |> emit_telemetry(HeartbeatTimedOut(session.id, now_ms))
253              False -> session
254            }
255          _ -> session
256        }
257  
258      Crash(reason) ->
259        Session(..session, crashed: True, lifecycle: typestate.TerminatedLabel)
260        |> emit_telemetry(SessionCrashed(session.id, reason))
261  
262      Restart(now_ms) ->
263        case session.crashed {
264          True ->
265            restart_session(session, now_ms)
266            |> emit_telemetry(SessionRestarted(session.id))
267          False -> ignore_event(session, "restart")
268        }
269  
270      Shutdown(reason) ->
271        Session(..session, lifecycle: typestate.TerminatedLabel)
272        |> emit_telemetry(SessionShutdown(session.id, reason))
273    }
274  }
275  
276  fn connect_live(
277    session: Session,
278    route: String,
279    now_ms: Int,
280    connect_event: TelemetryEvent,
281    keep_counter keep_counter: Bool,
282  ) -> Session {
283    let counter = case keep_counter {
284      True -> session.counter
285      False -> 0
286    }
287  
288    Session(
289      ..session,
290      lifecycle: typestate.LiveLabel,
291      crashed: False,
292      mounted_route: Some(route),
293      counter: counter,
294      heartbeat_deadline_ms: now_ms + session.timeout_ms,
295      pending_rev: [],
296      outbox_rev: [],
297      counter_static_sent: False,
298    )
299    |> emit_telemetry(connect_event)
300    |> queue_counter_patch
301  }
302  
303  fn restart_session(session: Session, now_ms: Int) -> Session {
304    case session.reconnect_policy {
305      Rehydrate ->
306        Session(
307          ..session,
308          crashed: False,
309          lifecycle: typestate.DisconnectedLabel,
310          heartbeat_deadline_ms: now_ms + session.timeout_ms,
311          pending_rev: [],
312          outbox_rev: [],
313          counter_static_sent: False,
314        )
315      Remount ->
316        Session(
317          ..session,
318          crashed: False,
319          lifecycle: typestate.DisconnectedLabel,
320          counter: 0,
321          heartbeat_deadline_ms: now_ms + session.timeout_ms,
322          pending_rev: [],
323          outbox_rev: [],
324          counter_static_sent: False,
325        )
326    }
327  }
328  
329  fn apply_counter_delta(session: Session, delta: Int) -> Session {
330    case session.lifecycle {
331      typestate.LiveLabel -> {
332        let counter = session.counter + delta
333        Session(..session, counter: counter)
334        |> emit_telemetry(CounterUpdated(session.id, counter))
335        |> queue_counter_patch
336      }
337      _ -> ignore_event(session, "counter_update")
338    }
339  }
340  
341  fn apply_ack(session: Session, ref: String) -> Session {
342    let #(pending_rev, removed) = remove_patch_ref(session.pending_rev, ref)
343  
344    case removed {
345      True ->
346        Session(..session, pending_rev: pending_rev)
347        |> emit_telemetry(PatchAcked(session.id, ref))
348      False -> ignore_event(session, "ack")
349    }
350  }
351  
352  fn queue_counter_patch(session: Session) -> Session {
353    let ref = int.to_string(session.next_patch_ref)
354    let patch_payload = case session.counter_static_sent {
355      False ->
356        diff.ReplaceSegments(
357          target: "#app",
358          fingerprint: counter_fingerprint(),
359          static_html: counter_static_html(),
360          dynamic_slots: [
361            diff.slot("counter", int.to_string(session.counter)),
362          ],
363        )
364      True ->
365        diff.UpdateSegments(
366          target: "#app",
367          fingerprint: counter_fingerprint(),
368          dynamic_slots: [
369            diff.slot("counter", int.to_string(session.counter)),
370          ],
371        )
372    }
373    let patch = PatchEnvelope(ref: ref, patch: patch_payload)
374  
375    Session(
376      ..session,
377      next_patch_ref: session.next_patch_ref + 1,
378      counter_static_sent: True,
379      pending_rev: [patch, ..session.pending_rev],
380      outbox_rev: [OutboxPatch(patch), ..session.outbox_rev],
381    )
382    |> emit_telemetry(PatchQueued(session.id, ref))
383  }
384  
385  fn counter_fingerprint() -> String {
386    "counter-view/v1"
387  }
388  
389  fn counter_static_html() -> String {
390    "<button data-ls-key=\"counter-button\">"
391    <> "<span data-ls-slot=\"counter\"></span>"
392    <> "</button>"
393  }
394  
395  fn emit_telemetry(session: Session, event: TelemetryEvent) -> Session {
396    Session(
397      ..session,
398      telemetry_rev: [event, ..session.telemetry_rev],
399      outbox_rev: [OutboxTelemetry(event), ..session.outbox_rev],
400    )
401  }
402  
403  fn ignore_event(session: Session, name: String) -> Session {
404    emit_telemetry(session, EventIgnored(session.id, name))
405  }
406  
407  fn remove_patch_ref(
408    patches: List(PatchEnvelope),
409    ref: String,
410  ) -> #(List(PatchEnvelope), Bool) {
411    case patches {
412      [] -> #([], False)
413      [entry, ..rest] ->
414        case entry.ref == ref {
415          True -> #(rest, True)
416          False -> {
417            let #(next, removed) = remove_patch_ref(rest, ref)
418            #([entry, ..next], removed)
419          }
420        }
421    }
422  }