session.gleam
1 //// Minimal typed session actor model for Phase 2. 2 3 import gleam/int 4 import gleam/list 5 import gleam/option.{type Option, None, Some} 6 import lightspeed/agent/typestate 7 import lightspeed/diff 8 9 /// Reconnect strategy after disconnect/crash. 10 pub type ReconnectPolicy { 11 Rehydrate 12 Remount 13 } 14 15 /// Typed inbox events handled by one live-session actor. 16 pub type InboxEvent { 17 Connect(route: String, csrf_token: String, now_ms: Int) 18 Reconnect(route: String, now_ms: Int) 19 Increment 20 Decrement 21 Ack(ref: String) 22 Heartbeat(now_ms: Int) 23 Tick(now_ms: Int) 24 Crash(reason: String) 25 Restart(now_ms: Int) 26 Shutdown(reason: String) 27 } 28 29 /// Typed inbox envelope that identifies the sender process id. 30 pub type InboxMessage { 31 InboxMessage(owner: String, event: InboxEvent) 32 } 33 34 /// Patch queued by the session actor. 35 pub type PatchEnvelope { 36 PatchEnvelope(ref: String, patch: diff.Patch) 37 } 38 39 /// Telemetry emitted by one session actor. 40 pub type TelemetryEvent { 41 SessionMounted(session_id: String, route: String) 42 SessionRehydrated(session_id: String, route: String) 43 SessionRemounted(session_id: String, route: String) 44 CounterUpdated(session_id: String, value: Int) 45 PatchQueued(session_id: String, ref: String) 46 PatchAcked(session_id: String, ref: String) 47 HeartbeatReceived(session_id: String, deadline_ms: Int) 48 HeartbeatTimedOut(session_id: String, now_ms: Int) 49 SessionCrashed(session_id: String, reason: String) 50 SessionRestarted(session_id: String) 51 SessionShutdown(session_id: String, reason: String) 52 OwnershipRejected(session_id: String, owner: String) 53 EventIgnored(session_id: String, event: String) 54 } 55 56 /// Typed outbox records emitted from the actor. 57 pub type OutboxMessage { 58 OutboxPatch(PatchEnvelope) 59 OutboxTelemetry(TelemetryEvent) 60 } 61 62 /// Internal state of one session actor. 63 pub opaque type Session { 64 Session( 65 id: String, 66 owner: String, 67 lifecycle: typestate.Lifecycle, 68 reconnect_policy: ReconnectPolicy, 69 mounted_route: Option(String), 70 counter: Int, 71 next_patch_ref: Int, 72 pending_rev: List(PatchEnvelope), 73 outbox_rev: List(OutboxMessage), 74 telemetry_rev: List(TelemetryEvent), 75 counter_static_sent: Bool, 76 timeout_ms: Int, 77 heartbeat_deadline_ms: Int, 78 crashed: Bool, 79 ) 80 } 81 82 /// Start a new disconnected session actor state. 83 pub fn start( 84 id: String, 85 owner: String, 86 reconnect_policy: ReconnectPolicy, 87 now_ms: Int, 88 timeout_ms: Int, 89 ) -> Session { 90 Session( 91 id: id, 92 owner: owner, 93 lifecycle: typestate.DisconnectedLabel, 94 reconnect_policy: reconnect_policy, 95 mounted_route: None, 96 counter: 0, 97 next_patch_ref: 1, 98 pending_rev: [], 99 outbox_rev: [], 100 telemetry_rev: [], 101 counter_static_sent: False, 102 timeout_ms: timeout_ms, 103 heartbeat_deadline_ms: now_ms + timeout_ms, 104 crashed: False, 105 ) 106 } 107 108 /// Handle one typed inbox message. 109 pub fn handle(session: Session, message: InboxMessage) -> Session { 110 case message { 111 InboxMessage(owner, event) -> 112 case owner == session.owner { 113 True -> apply_event(session, event) 114 False -> emit_telemetry(session, OwnershipRejected(session.id, owner)) 115 } 116 } 117 } 118 119 /// Session id. 120 pub fn id(session: Session) -> String { 121 session.id 122 } 123 124 /// Owning server process id. 125 pub fn owner(session: Session) -> String { 126 session.owner 127 } 128 129 /// Lifecycle label. 130 pub fn lifecycle(session: Session) -> typestate.Lifecycle { 131 session.lifecycle 132 } 133 134 /// Current counter model used by this minimal actor. 135 pub fn counter(session: Session) -> Int { 136 session.counter 137 } 138 139 /// Current reconnect policy. 140 pub fn reconnect_policy(session: Session) -> ReconnectPolicy { 141 session.reconnect_policy 142 } 143 144 /// Pending patches waiting for ack, in emit order. 145 pub fn pending_patches(session: Session) -> List(PatchEnvelope) { 146 list.reverse(session.pending_rev) 147 } 148 149 /// Telemetry records in emit order. 150 pub fn telemetry(session: Session) -> List(TelemetryEvent) { 151 list.reverse(session.telemetry_rev) 152 } 153 154 /// Heartbeat deadline timestamp in milliseconds. 155 pub fn heartbeat_deadline_ms(session: Session) -> Int { 156 session.heartbeat_deadline_ms 157 } 158 159 /// True when session has crashed and is waiting for restart. 160 pub fn crashed(session: Session) -> Bool { 161 session.crashed 162 } 163 164 /// Drain the outbox and clear it from state. 165 pub fn flush_outbox(session: Session) -> #(Session, List(OutboxMessage)) { 166 let outbox = list.reverse(session.outbox_rev) 167 #(Session(..session, outbox_rev: []), outbox) 168 } 169 170 /// Extract patch reference. 171 pub fn patch_ref(patch: PatchEnvelope) -> String { 172 patch.ref 173 } 174 175 /// Extract patch payload. 176 pub fn patch(patch: PatchEnvelope) -> diff.Patch { 177 patch.patch 178 } 179 180 /// Stable telemetry label for assertions and logs. 181 pub fn telemetry_label(event: TelemetryEvent) -> String { 182 case event { 183 SessionMounted(_, _) -> "session_mounted" 184 SessionRehydrated(_, _) -> "session_rehydrated" 185 SessionRemounted(_, _) -> "session_remounted" 186 CounterUpdated(_, _) -> "counter_updated" 187 PatchQueued(_, _) -> "patch_queued" 188 PatchAcked(_, _) -> "patch_acked" 189 HeartbeatReceived(_, _) -> "heartbeat_received" 190 HeartbeatTimedOut(_, _) -> "heartbeat_timed_out" 191 SessionCrashed(_, _) -> "session_crashed" 192 SessionRestarted(_) -> "session_restarted" 193 SessionShutdown(_, _) -> "session_shutdown" 194 OwnershipRejected(_, _) -> "ownership_rejected" 195 EventIgnored(_, _) -> "event_ignored" 196 } 197 } 198 199 fn apply_event(session: Session, event: InboxEvent) -> Session { 200 case event { 201 Connect(route, _, now_ms) -> 202 connect_live( 203 session, 204 route, 205 now_ms, 206 SessionMounted(session.id, route), 207 keep_counter: True, 208 ) 209 210 Reconnect(route, now_ms) -> 211 case session.reconnect_policy { 212 Rehydrate -> 213 connect_live( 214 session, 215 route, 216 now_ms, 217 SessionRehydrated(session.id, route), 218 keep_counter: True, 219 ) 220 Remount -> 221 connect_live( 222 session, 223 route, 224 now_ms, 225 SessionRemounted(session.id, route), 226 keep_counter: False, 227 ) 228 } 229 230 Increment -> apply_counter_delta(session, 1) 231 232 Decrement -> apply_counter_delta(session, -1) 233 234 Ack(ref) -> apply_ack(session, ref) 235 236 Heartbeat(now_ms) -> 237 case session.lifecycle { 238 typestate.LiveLabel -> { 239 let deadline = now_ms + session.timeout_ms 240 Session(..session, heartbeat_deadline_ms: deadline) 241 |> emit_telemetry(HeartbeatReceived(session.id, deadline)) 242 } 243 _ -> ignore_event(session, "heartbeat") 244 } 245 246 Tick(now_ms) -> 247 case session.lifecycle { 248 typestate.LiveLabel -> 249 case now_ms > session.heartbeat_deadline_ms { 250 True -> 251 Session(..session, lifecycle: typestate.DrainingLabel) 252 |> emit_telemetry(HeartbeatTimedOut(session.id, now_ms)) 253 False -> session 254 } 255 _ -> session 256 } 257 258 Crash(reason) -> 259 Session(..session, crashed: True, lifecycle: typestate.TerminatedLabel) 260 |> emit_telemetry(SessionCrashed(session.id, reason)) 261 262 Restart(now_ms) -> 263 case session.crashed { 264 True -> 265 restart_session(session, now_ms) 266 |> emit_telemetry(SessionRestarted(session.id)) 267 False -> ignore_event(session, "restart") 268 } 269 270 Shutdown(reason) -> 271 Session(..session, lifecycle: typestate.TerminatedLabel) 272 |> emit_telemetry(SessionShutdown(session.id, reason)) 273 } 274 } 275 276 fn connect_live( 277 session: Session, 278 route: String, 279 now_ms: Int, 280 connect_event: TelemetryEvent, 281 keep_counter keep_counter: Bool, 282 ) -> Session { 283 let counter = case keep_counter { 284 True -> session.counter 285 False -> 0 286 } 287 288 Session( 289 ..session, 290 lifecycle: typestate.LiveLabel, 291 crashed: False, 292 mounted_route: Some(route), 293 counter: counter, 294 heartbeat_deadline_ms: now_ms + session.timeout_ms, 295 pending_rev: [], 296 outbox_rev: [], 297 counter_static_sent: False, 298 ) 299 |> emit_telemetry(connect_event) 300 |> queue_counter_patch 301 } 302 303 fn restart_session(session: Session, now_ms: Int) -> Session { 304 case session.reconnect_policy { 305 Rehydrate -> 306 Session( 307 ..session, 308 crashed: False, 309 lifecycle: typestate.DisconnectedLabel, 310 heartbeat_deadline_ms: now_ms + session.timeout_ms, 311 pending_rev: [], 312 outbox_rev: [], 313 counter_static_sent: False, 314 ) 315 Remount -> 316 Session( 317 ..session, 318 crashed: False, 319 lifecycle: typestate.DisconnectedLabel, 320 counter: 0, 321 heartbeat_deadline_ms: now_ms + session.timeout_ms, 322 pending_rev: [], 323 outbox_rev: [], 324 counter_static_sent: False, 325 ) 326 } 327 } 328 329 fn apply_counter_delta(session: Session, delta: Int) -> Session { 330 case session.lifecycle { 331 typestate.LiveLabel -> { 332 let counter = session.counter + delta 333 Session(..session, counter: counter) 334 |> emit_telemetry(CounterUpdated(session.id, counter)) 335 |> queue_counter_patch 336 } 337 _ -> ignore_event(session, "counter_update") 338 } 339 } 340 341 fn apply_ack(session: Session, ref: String) -> Session { 342 let #(pending_rev, removed) = remove_patch_ref(session.pending_rev, ref) 343 344 case removed { 345 True -> 346 Session(..session, pending_rev: pending_rev) 347 |> emit_telemetry(PatchAcked(session.id, ref)) 348 False -> ignore_event(session, "ack") 349 } 350 } 351 352 fn queue_counter_patch(session: Session) -> Session { 353 let ref = int.to_string(session.next_patch_ref) 354 let patch_payload = case session.counter_static_sent { 355 False -> 356 diff.ReplaceSegments( 357 target: "#app", 358 fingerprint: counter_fingerprint(), 359 static_html: counter_static_html(), 360 dynamic_slots: [ 361 diff.slot("counter", int.to_string(session.counter)), 362 ], 363 ) 364 True -> 365 diff.UpdateSegments( 366 target: "#app", 367 fingerprint: counter_fingerprint(), 368 dynamic_slots: [ 369 diff.slot("counter", int.to_string(session.counter)), 370 ], 371 ) 372 } 373 let patch = PatchEnvelope(ref: ref, patch: patch_payload) 374 375 Session( 376 ..session, 377 next_patch_ref: session.next_patch_ref + 1, 378 counter_static_sent: True, 379 pending_rev: [patch, ..session.pending_rev], 380 outbox_rev: [OutboxPatch(patch), ..session.outbox_rev], 381 ) 382 |> emit_telemetry(PatchQueued(session.id, ref)) 383 } 384 385 fn counter_fingerprint() -> String { 386 "counter-view/v1" 387 } 388 389 fn counter_static_html() -> String { 390 "<button data-ls-key=\"counter-button\">" 391 <> "<span data-ls-slot=\"counter\"></span>" 392 <> "</button>" 393 } 394 395 fn emit_telemetry(session: Session, event: TelemetryEvent) -> Session { 396 Session( 397 ..session, 398 telemetry_rev: [event, ..session.telemetry_rev], 399 outbox_rev: [OutboxTelemetry(event), ..session.outbox_rev], 400 ) 401 } 402 403 fn ignore_event(session: Session, name: String) -> Session { 404 emit_telemetry(session, EventIgnored(session.id, name)) 405 } 406 407 fn remove_patch_ref( 408 patches: List(PatchEnvelope), 409 ref: String, 410 ) -> #(List(PatchEnvelope), Bool) { 411 case patches { 412 [] -> #([], False) 413 [entry, ..rest] -> 414 case entry.ref == ref { 415 True -> #(rest, True) 416 False -> { 417 let #(next, removed) = remove_patch_ref(rest, ref) 418 #([entry, ..next], removed) 419 } 420 } 421 } 422 }