pty_session.go
1 // Copyright 2025 Alibaba Group Holding Ltd. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 //go:build !windows 16 // +build !windows 17 18 package runtime 19 20 import ( 21 "errors" 22 "fmt" 23 "io" 24 "os" 25 "os/exec" 26 "sync" 27 "sync/atomic" 28 "syscall" 29 30 "github.com/alibaba/opensandbox/internal/safego" 31 "github.com/creack/pty" 32 33 "github.com/alibaba/opensandbox/execd/pkg/log" 34 "github.com/alibaba/opensandbox/execd/pkg/util/pathutil" 35 ) 36 37 // PTYSession is the public interface for an interactive PTY/pipe session. 38 // The concrete implementation (*ptySession) is unexported; callers outside 39 // this package must use this interface. 40 type PTYSession interface { 41 LockWS() bool 42 UnlockWS() 43 IsRunning() bool 44 IsPTY() bool 45 ExitCode() int 46 Done() <-chan struct{} 47 StartPTY() error 48 StartPipe() error 49 WriteStdin(p []byte) (int, error) 50 AttachOutput() (io.Reader, io.Reader, func()) 51 AttachOutputWithSnapshot(since int64) (io.Reader, io.Reader, func(), []byte, int64) 52 SendSignal(name string) 53 ResizePTY(cols, rows uint16) error 54 } 55 56 // IsPTYSessionSupported reports whether PTY sessions are supported on this platform. 57 func IsPTYSessionSupported() bool { return true } 58 59 func NewPTYSessionID() string { 60 return uuidString() 61 } 62 63 // ptySession manages a single interactive PTY or pipe-mode bash process. 64 // 65 // Lifecycle: 66 // 1. Create via newPTYSession. 67 // 2. Call StartPTY() or StartPipe() from the WS handler (after LockWS). 68 // 3. Zero or more clients call AttachOutput() to receive live output. 69 // 4. The bash process exits → Done() closes → exit frame sent. 70 // 5. Call close() to terminate an early session and release resources. 71 type ptySession struct { 72 id string 73 cwd string 74 75 mu sync.Mutex 76 closing bool 77 78 // Process tracking (guarded by mu) 79 pid int // PID of the running bash process (0 = not running) 80 lastExitCode int // exit code; -1 until process exits 81 doneCh chan struct{} // closed when process exits (non-nil after Start*) 82 83 // Stdin (PTY master in PTY mode; write end of os.Pipe in pipe mode) 84 stdin io.WriteCloser 85 86 // PTY-specific 87 isPTY bool 88 ptmx *os.File // PTY master fd; nil in pipe mode 89 90 // Replay 91 replay *replayBuffer 92 93 // WS exclusive lock: only one WebSocket client at a time. 94 wsConnected atomic.Bool 95 96 // Output broadcast (guards stdoutW / stderrW). 97 // The broadcast goroutine holds outMu only while reading the pointer; writes 98 // to the pipe happen outside the lock to avoid blocking broadcast on slow clients. 99 outMu sync.Mutex 100 stdoutW *io.PipeWriter // current per-connection sink; nil when no client attached 101 stderrW *io.PipeWriter // nil in PTY mode 102 } 103 104 func newPTYSession(id, cwd string) *ptySession { 105 return &ptySession{ 106 id: id, 107 cwd: cwd, 108 replay: newReplayBuffer(), 109 lastExitCode: -1, 110 } 111 } 112 113 // LockWS attempts to acquire the exclusive WebSocket connection lock. 114 // Returns true on success, false if another client is already connected. 115 func (s *ptySession) LockWS() bool { 116 return s.wsConnected.CompareAndSwap(false, true) 117 } 118 119 // UnlockWS releases the WebSocket connection lock. 120 func (s *ptySession) UnlockWS() { 121 s.wsConnected.Store(false) 122 } 123 124 // IsRunning returns true if the bash process is currently alive. 125 func (s *ptySession) IsRunning() bool { 126 s.mu.Lock() 127 defer s.mu.Unlock() 128 return s.pid != 0 129 } 130 131 // IsPTY returns true when the session was started in PTY mode. 132 func (s *ptySession) IsPTY() bool { 133 return s.isPTY 134 } 135 136 // ExitCode returns the exit code of the last process, or -1 if it has not exited yet. 137 func (s *ptySession) ExitCode() int { 138 s.mu.Lock() 139 defer s.mu.Unlock() 140 return s.lastExitCode 141 } 142 143 // Done returns a channel that is closed when the bash process exits. 144 // Returns nil if the process has not been started yet. 145 func (s *ptySession) Done() <-chan struct{} { 146 s.mu.Lock() 147 defer s.mu.Unlock() 148 return s.doneCh 149 } 150 151 // ReplayBuffer returns the session's replay buffer (thread-safe). 152 func (s *ptySession) ReplayBuffer() *replayBuffer { 153 return s.replay 154 } 155 156 // StartPTY launches bash via pty.StartWithSize. 157 // Must be called with the WS lock held. 158 func (s *ptySession) StartPTY() error { 159 s.mu.Lock() 160 defer s.mu.Unlock() 161 162 if s.pid != 0 { 163 return errors.New("pty session already started") 164 } 165 if s.closing { 166 return errors.New("pty session is closing") 167 } 168 169 cmd := exec.Command("bash", "--norc", "--noprofile") 170 cmd.Env = os.Environ() 171 if s.cwd != "" { 172 cmd.Dir = s.cwd 173 } 174 // Do NOT set Setpgid: pty.StartWithSize sets Setsid+Setctty internally. 175 // Combining Setsid+Setpgid causes EPERM (setpgid is illegal for a session leader). 176 177 ptmx, err := pty.StartWithSize(cmd, &pty.Winsize{Cols: 80, Rows: 24}) 178 if err != nil { 179 return fmt.Errorf("pty.StartWithSize: %w", err) 180 } 181 182 s.ptmx = ptmx 183 s.isPTY = true 184 s.pid = cmd.Process.Pid 185 s.doneCh = make(chan struct{}) 186 s.stdin = ptmx // write to the PTY master to feed stdin 187 188 safego.Go(func() { s.broadcastPTY() }) 189 safego.Go(func() { s.waitAndExit(cmd, ptmx) }) 190 191 return nil 192 } 193 194 // StartPipe launches bash with plain stdin/stdout/stderr os.Pipes. 195 // Must be called with the WS lock held. 196 func (s *ptySession) StartPipe() error { 197 s.mu.Lock() 198 defer s.mu.Unlock() 199 200 if s.pid != 0 { 201 return errors.New("pty session already started") 202 } 203 if s.closing { 204 return errors.New("pty session is closing") 205 } 206 207 stdinR, stdinW, err := os.Pipe() 208 if err != nil { 209 return fmt.Errorf("stdin pipe: %w", err) 210 } 211 stdoutR, stdoutW, err := os.Pipe() 212 if err != nil { 213 _ = stdinR.Close() 214 _ = stdinW.Close() 215 return fmt.Errorf("stdout pipe: %w", err) 216 } 217 stderrR, stderrW, err := os.Pipe() 218 if err != nil { 219 _ = stdinR.Close() 220 _ = stdinW.Close() 221 _ = stdoutR.Close() 222 _ = stdoutW.Close() 223 return fmt.Errorf("stderr pipe: %w", err) 224 } 225 226 cmd := exec.Command("bash", "--norc", "--noprofile") 227 cmd.Env = os.Environ() 228 if s.cwd != "" { 229 cmd.Dir = s.cwd 230 } 231 cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} 232 cmd.Stdin = stdinR 233 cmd.Stdout = stdoutW 234 cmd.Stderr = stderrW 235 236 if err := cmd.Start(); err != nil { 237 _ = stdinR.Close() 238 _ = stdinW.Close() 239 _ = stdoutR.Close() 240 _ = stdoutW.Close() 241 _ = stderrR.Close() 242 _ = stderrW.Close() 243 return fmt.Errorf("cmd.Start: %w", err) 244 } 245 246 // Close the child-side ends in the parent — the child has its own copies. 247 _ = stdinR.Close() 248 _ = stdoutW.Close() 249 _ = stderrW.Close() 250 251 s.isPTY = false 252 s.pid = cmd.Process.Pid 253 s.doneCh = make(chan struct{}) 254 s.stdin = stdinW 255 256 safego.Go(func() { s.broadcastPipe(stdoutR, true) }) 257 safego.Go(func() { s.broadcastPipe(stderrR, false) }) 258 safego.Go(func() { s.waitAndExitPipe(cmd, stdinW, stdoutR, stderrR) }) 259 260 return nil 261 } 262 263 // broadcastPTY reads from the PTY master and fans out to replay + active WS client. 264 func (s *ptySession) broadcastPTY() { 265 buf := make([]byte, 32*1024) 266 for { 267 n, err := s.ptmx.Read(buf) 268 if n > 0 { 269 s.writeAndFanout(buf[:n], true) 270 } 271 if err != nil { 272 // EIO or EOF when the child exits — normal termination 273 break 274 } 275 } 276 } 277 278 // broadcastPipe reads from a pipe (stdout or stderr) and fans out to replay + active WS client. 279 func (s *ptySession) broadcastPipe(r *os.File, isStdout bool) { 280 buf := make([]byte, 32*1024) 281 for { 282 n, err := r.Read(buf) 283 if n > 0 { 284 s.writeAndFanout(buf[:n], isStdout) 285 } 286 if err != nil { 287 break 288 } 289 } 290 _ = r.Close() 291 } 292 293 // writeAndFanout writes chunk to the replay buffer and delivers it to the 294 // active per-connection pipe, atomically under outMu. 295 // 296 // Holding outMu across both operations closes the window where bytes written 297 // to replay after ReadFrom but before AttachOutput would be silently dropped. 298 // Lock order is always outMu → replay.mu (both paths), so no deadlock is possible. 299 func (s *ptySession) writeAndFanout(chunk []byte, isStdout bool) { 300 s.outMu.Lock() 301 s.replay.write(chunk) // acquires replay.mu inside (outMu → replay.mu) 302 var w *io.PipeWriter 303 if isStdout { 304 w = s.stdoutW 305 } else { 306 w = s.stderrW 307 } 308 s.outMu.Unlock() 309 310 if w != nil { 311 if _, err := w.Write(chunk); err != nil { 312 // Pipe was closed (client detached) — ignore. 313 log.Warning("pty fanout write: %v", err) 314 } 315 } 316 } 317 318 // waitAndExit waits for the PTY-mode process and updates session state on exit. 319 func (s *ptySession) waitAndExit(cmd *exec.Cmd, ptmx *os.File) { 320 _ = cmd.Wait() 321 322 // Close the PTY master to unblock the broadcast goroutine. 323 _ = ptmx.Close() 324 325 s.mu.Lock() 326 exitCode := 0 327 if cmd.ProcessState != nil { 328 exitCode = cmd.ProcessState.ExitCode() 329 } 330 s.lastExitCode = exitCode 331 s.pid = 0 332 doneCh := s.doneCh 333 s.mu.Unlock() 334 335 close(doneCh) 336 } 337 338 // waitAndExitPipe waits for the pipe-mode process and updates session state on exit. 339 func (s *ptySession) waitAndExitPipe(cmd *exec.Cmd, stdinW, stdoutR, stderrR *os.File) { 340 _ = cmd.Wait() 341 342 // Close stdin write-end so the child (if still running) sees EOF. 343 _ = stdinW.Close() 344 345 s.mu.Lock() 346 exitCode := 0 347 if cmd.ProcessState != nil { 348 exitCode = cmd.ProcessState.ExitCode() 349 } 350 s.lastExitCode = exitCode 351 s.pid = 0 352 doneCh := s.doneCh 353 s.mu.Unlock() 354 355 close(doneCh) 356 } 357 358 // WriteStdin writes p to bash stdin (PTY master or pipe write-end). 359 func (s *ptySession) WriteStdin(p []byte) (int, error) { 360 s.mu.Lock() 361 w := s.stdin 362 s.mu.Unlock() 363 if w == nil { 364 return 0, errors.New("session not started") 365 } 366 return w.Write(p) 367 } 368 369 // AttachOutput creates a fresh per-connection io.Pipe and swaps it into the 370 // broadcast fanout path. 371 // 372 // Ordering guarantee (no duplicates on reconnect): 373 // - Caller must snapshot the replay buffer BEFORE calling AttachOutput. 374 // - Bytes produced between the snapshot and AttachOutput are delivered via 375 // the live pipe only (not in the snapshot), so each byte arrives exactly once. 376 // 377 // Returns (stdout reader, stderr reader [nil in PTY mode], detach func). 378 // Calling detach() closes the writers, sending EOF to the readers and 379 // unblocking all pump goroutines. 380 func (s *ptySession) AttachOutput() (io.Reader, io.Reader, func()) { 381 stdoutR, stdoutW := io.Pipe() 382 383 s.outMu.Lock() 384 s.stdoutW = stdoutW 385 s.outMu.Unlock() 386 387 if s.isPTY { 388 detach := func() { 389 s.outMu.Lock() 390 s.stdoutW = nil 391 s.outMu.Unlock() 392 _ = stdoutW.Close() 393 } 394 return stdoutR, nil, detach 395 } 396 397 // Pipe mode: also attach stderr. 398 stderrR, stderrW := io.Pipe() 399 400 s.outMu.Lock() 401 s.stderrW = stderrW 402 s.outMu.Unlock() 403 404 detach := func() { 405 s.outMu.Lock() 406 s.stdoutW = nil 407 s.stderrW = nil 408 s.outMu.Unlock() 409 _ = stdoutW.Close() 410 _ = stderrW.Close() 411 } 412 return stdoutR, stderrR, detach 413 } 414 415 // AttachOutputWithSnapshot atomically snapshots the replay buffer and attaches 416 // the per-connection output pipe, eliminating the output-loss window that exists 417 // when ReadFrom and AttachOutput are called separately. 418 // 419 // Must be used together with writeAndFanout (which holds outMu during both 420 // replay.write and the fanout pointer read). 421 // 422 // Lock order is always outMu → replay.mu (both paths), so no deadlock is possible. 423 // 424 // Returns (stdoutR, stderrR [nil in PTY mode], detach, snapshotBytes, snapshotOffset). 425 func (s *ptySession) AttachOutputWithSnapshot(since int64) (io.Reader, io.Reader, func(), []byte, int64) { 426 stdoutR, stdoutW := io.Pipe() 427 var stderrR io.Reader 428 var stderrW *io.PipeWriter 429 if !s.isPTY { 430 stderrR, stderrW = io.Pipe() 431 } 432 433 s.outMu.Lock() 434 snapshotBytes, snapshotOffset := s.replay.ReadFrom(since) // acquires replay.mu inside 435 s.stdoutW = stdoutW 436 if stderrW != nil { 437 s.stderrW = stderrW 438 } 439 s.outMu.Unlock() 440 441 detach := func() { 442 s.outMu.Lock() 443 s.stdoutW = nil 444 if stderrW != nil { 445 s.stderrW = nil 446 } 447 s.outMu.Unlock() 448 _ = stdoutW.Close() 449 if stderrW != nil { 450 _ = stderrW.Close() 451 } 452 } 453 return stdoutR, stderrR, detach, snapshotBytes, snapshotOffset 454 } 455 456 // SendSignal sends the named signal to the process group. 457 // Recognised names: SIGINT, SIGTERM, SIGKILL, SIGQUIT, SIGHUP. 458 func (s *ptySession) SendSignal(name string) { 459 s.mu.Lock() 460 pid := s.pid 461 s.mu.Unlock() 462 if pid == 0 { 463 return 464 } 465 466 sig := parseSignalName(name) 467 if sig == 0 { 468 log.Warning("ptySession.SendSignal: unknown signal %q", name) 469 return 470 } 471 472 // In PTY mode (setsid), pgid == pid automatically. 473 // In pipe mode (Setpgid), pgid is also == pid. 474 // Either way, Kill(-pid, sig) sends to the process group. 475 if err := syscall.Kill(-pid, sig); err != nil { 476 log.Warning("ptySession.SendSignal kill(-%d, %v): %v", pid, sig, err) 477 } 478 } 479 480 func parseSignalName(name string) syscall.Signal { 481 switch name { 482 case "SIGINT": 483 return syscall.SIGINT 484 case "SIGTERM": 485 return syscall.SIGTERM 486 case "SIGKILL": 487 return syscall.SIGKILL 488 case "SIGQUIT": 489 return syscall.SIGQUIT 490 case "SIGHUP": 491 return syscall.SIGHUP 492 default: 493 return 0 494 } 495 } 496 497 // ResizePTY updates the terminal window size (PTY mode only; no-op in pipe mode). 498 func (s *ptySession) ResizePTY(cols, rows uint16) error { 499 s.mu.Lock() 500 ptmx := s.ptmx 501 s.mu.Unlock() 502 if ptmx == nil { 503 return nil // pipe mode or not started 504 } 505 return pty.Setsize(ptmx, &pty.Winsize{Cols: cols, Rows: rows}) 506 } 507 508 // close terminates the session and releases all resources. 509 // Safe to call multiple times. 510 func (s *ptySession) close() { 511 s.mu.Lock() 512 if s.closing { 513 s.mu.Unlock() 514 return 515 } 516 s.closing = true 517 pid := s.pid 518 ptmx := s.ptmx 519 stdin := s.stdin 520 s.mu.Unlock() 521 522 if pid != 0 { 523 _ = syscall.Kill(-pid, syscall.SIGKILL) 524 } 525 if ptmx != nil { 526 _ = ptmx.Close() 527 } else if stdin != nil { 528 _ = stdin.Close() 529 } 530 531 // Detach any active WS output pipe so pump goroutines unblock. 532 s.outMu.Lock() 533 stdoutW := s.stdoutW 534 stderrW := s.stderrW 535 s.stdoutW = nil 536 s.stderrW = nil 537 s.outMu.Unlock() 538 if stdoutW != nil { 539 _ = stdoutW.Close() 540 } 541 if stderrW != nil { 542 _ = stderrW.Close() 543 } 544 } 545 546 // CreatePTYSession creates a new PTY session and stores it in the map. 547 func (c *Controller) CreatePTYSession(id, cwd string) (PTYSession, error) { 548 resolvedCwd, err := pathutil.ExpandPath(cwd) 549 if err != nil { 550 return nil, fmt.Errorf("error resolving PTY session work directory: %w", err) 551 } 552 if resolvedCwd != "" { 553 err := os.MkdirAll(resolvedCwd, os.ModePerm) 554 if err != nil { 555 return nil, fmt.Errorf("error creating PTY session work directory: %w", err) 556 } 557 } 558 s := newPTYSession(id, resolvedCwd) 559 c.ptySessionMap.Store(id, s) 560 log.Info("created pty session %s", id) 561 return s, nil 562 } 563 564 // getPTYSession looks up a PTY session by ID. Returns nil if not found. 565 // For internal use only; outside callers should use GetPTYSession. 566 func (c *Controller) getPTYSession(id string) *ptySession { 567 if v, ok := c.ptySessionMap.Load(id); ok { 568 if s, ok := v.(*ptySession); ok { 569 return s 570 } 571 } 572 return nil 573 } 574 575 // GetPTYSession looks up a PTY session by ID. Returns nil if not found. 576 func (c *Controller) GetPTYSession(id string) PTYSession { 577 s := c.getPTYSession(id) 578 if s == nil { 579 return nil 580 } 581 return s 582 } 583 584 // DeletePTYSession terminates and removes a PTY session. 585 // Returns ErrContextNotFound if the session does not exist. 586 func (c *Controller) DeletePTYSession(id string) error { 587 s := c.getPTYSession(id) 588 if s == nil { 589 return ErrContextNotFound 590 } 591 s.close() 592 c.ptySessionMap.Delete(id) 593 log.Info("deleted pty session %s", id) 594 return nil 595 } 596 597 // GetPTYSessionStatus returns status information for a PTY session. 598 func (c *Controller) GetPTYSessionStatus(id string) (running bool, outputOffset int64, err error) { 599 s := c.getPTYSession(id) 600 if s == nil { 601 return false, 0, ErrContextNotFound 602 } 603 return s.IsRunning(), s.replay.Total(), nil 604 }