/ components / execd / pkg / runtime / pty_session.go
pty_session.go
  1  // Copyright 2025 Alibaba Group Holding Ltd.
  2  //
  3  // Licensed under the Apache License, Version 2.0 (the "License");
  4  // you may not use this file except in compliance with the License.
  5  // You may obtain a copy of the License at
  6  //
  7  //     http://www.apache.org/licenses/LICENSE-2.0
  8  //
  9  // Unless required by applicable law or agreed to in writing, software
 10  // distributed under the License is distributed on an "AS IS" BASIS,
 11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  // See the License for the specific language governing permissions and
 13  // limitations under the License.
 14  
 15  //go:build !windows
 16  // +build !windows
 17  
 18  package runtime
 19  
 20  import (
 21  	"errors"
 22  	"fmt"
 23  	"io"
 24  	"os"
 25  	"os/exec"
 26  	"sync"
 27  	"sync/atomic"
 28  	"syscall"
 29  
 30  	"github.com/alibaba/opensandbox/internal/safego"
 31  	"github.com/creack/pty"
 32  
 33  	"github.com/alibaba/opensandbox/execd/pkg/log"
 34  	"github.com/alibaba/opensandbox/execd/pkg/util/pathutil"
 35  )
 36  
 37  // PTYSession is the public interface for an interactive PTY/pipe session.
 38  // The concrete implementation (*ptySession) is unexported; callers outside
 39  // this package must use this interface.
 40  type PTYSession interface {
 41  	LockWS() bool
 42  	UnlockWS()
 43  	IsRunning() bool
 44  	IsPTY() bool
 45  	ExitCode() int
 46  	Done() <-chan struct{}
 47  	StartPTY() error
 48  	StartPipe() error
 49  	WriteStdin(p []byte) (int, error)
 50  	AttachOutput() (io.Reader, io.Reader, func())
 51  	AttachOutputWithSnapshot(since int64) (io.Reader, io.Reader, func(), []byte, int64)
 52  	SendSignal(name string)
 53  	ResizePTY(cols, rows uint16) error
 54  }
 55  
 56  // IsPTYSessionSupported reports whether PTY sessions are supported on this platform.
 57  func IsPTYSessionSupported() bool { return true }
 58  
 59  func NewPTYSessionID() string {
 60  	return uuidString()
 61  }
 62  
 63  // ptySession manages a single interactive PTY or pipe-mode bash process.
 64  //
 65  // Lifecycle:
 66  //  1. Create via newPTYSession.
 67  //  2. Call StartPTY() or StartPipe() from the WS handler (after LockWS).
 68  //  3. Zero or more clients call AttachOutput() to receive live output.
 69  //  4. The bash process exits → Done() closes → exit frame sent.
 70  //  5. Call close() to terminate an early session and release resources.
 71  type ptySession struct {
 72  	id  string
 73  	cwd string
 74  
 75  	mu      sync.Mutex
 76  	closing bool
 77  
 78  	// Process tracking (guarded by mu)
 79  	pid          int           // PID of the running bash process (0 = not running)
 80  	lastExitCode int           // exit code; -1 until process exits
 81  	doneCh       chan struct{} // closed when process exits (non-nil after Start*)
 82  
 83  	// Stdin (PTY master in PTY mode; write end of os.Pipe in pipe mode)
 84  	stdin io.WriteCloser
 85  
 86  	// PTY-specific
 87  	isPTY bool
 88  	ptmx  *os.File // PTY master fd; nil in pipe mode
 89  
 90  	// Replay
 91  	replay *replayBuffer
 92  
 93  	// WS exclusive lock: only one WebSocket client at a time.
 94  	wsConnected atomic.Bool
 95  
 96  	// Output broadcast (guards stdoutW / stderrW).
 97  	// The broadcast goroutine holds outMu only while reading the pointer; writes
 98  	// to the pipe happen outside the lock to avoid blocking broadcast on slow clients.
 99  	outMu   sync.Mutex
100  	stdoutW *io.PipeWriter // current per-connection sink; nil when no client attached
101  	stderrW *io.PipeWriter // nil in PTY mode
102  }
103  
104  func newPTYSession(id, cwd string) *ptySession {
105  	return &ptySession{
106  		id:           id,
107  		cwd:          cwd,
108  		replay:       newReplayBuffer(),
109  		lastExitCode: -1,
110  	}
111  }
112  
113  // LockWS attempts to acquire the exclusive WebSocket connection lock.
114  // Returns true on success, false if another client is already connected.
115  func (s *ptySession) LockWS() bool {
116  	return s.wsConnected.CompareAndSwap(false, true)
117  }
118  
119  // UnlockWS releases the WebSocket connection lock.
120  func (s *ptySession) UnlockWS() {
121  	s.wsConnected.Store(false)
122  }
123  
124  // IsRunning returns true if the bash process is currently alive.
125  func (s *ptySession) IsRunning() bool {
126  	s.mu.Lock()
127  	defer s.mu.Unlock()
128  	return s.pid != 0
129  }
130  
131  // IsPTY returns true when the session was started in PTY mode.
132  func (s *ptySession) IsPTY() bool {
133  	return s.isPTY
134  }
135  
136  // ExitCode returns the exit code of the last process, or -1 if it has not exited yet.
137  func (s *ptySession) ExitCode() int {
138  	s.mu.Lock()
139  	defer s.mu.Unlock()
140  	return s.lastExitCode
141  }
142  
143  // Done returns a channel that is closed when the bash process exits.
144  // Returns nil if the process has not been started yet.
145  func (s *ptySession) Done() <-chan struct{} {
146  	s.mu.Lock()
147  	defer s.mu.Unlock()
148  	return s.doneCh
149  }
150  
151  // ReplayBuffer returns the session's replay buffer (thread-safe).
152  func (s *ptySession) ReplayBuffer() *replayBuffer {
153  	return s.replay
154  }
155  
156  // StartPTY launches bash via pty.StartWithSize.
157  // Must be called with the WS lock held.
158  func (s *ptySession) StartPTY() error {
159  	s.mu.Lock()
160  	defer s.mu.Unlock()
161  
162  	if s.pid != 0 {
163  		return errors.New("pty session already started")
164  	}
165  	if s.closing {
166  		return errors.New("pty session is closing")
167  	}
168  
169  	cmd := exec.Command("bash", "--norc", "--noprofile")
170  	cmd.Env = os.Environ()
171  	if s.cwd != "" {
172  		cmd.Dir = s.cwd
173  	}
174  	// Do NOT set Setpgid: pty.StartWithSize sets Setsid+Setctty internally.
175  	// Combining Setsid+Setpgid causes EPERM (setpgid is illegal for a session leader).
176  
177  	ptmx, err := pty.StartWithSize(cmd, &pty.Winsize{Cols: 80, Rows: 24})
178  	if err != nil {
179  		return fmt.Errorf("pty.StartWithSize: %w", err)
180  	}
181  
182  	s.ptmx = ptmx
183  	s.isPTY = true
184  	s.pid = cmd.Process.Pid
185  	s.doneCh = make(chan struct{})
186  	s.stdin = ptmx // write to the PTY master to feed stdin
187  
188  	safego.Go(func() { s.broadcastPTY() })
189  	safego.Go(func() { s.waitAndExit(cmd, ptmx) })
190  
191  	return nil
192  }
193  
194  // StartPipe launches bash with plain stdin/stdout/stderr os.Pipes.
195  // Must be called with the WS lock held.
196  func (s *ptySession) StartPipe() error {
197  	s.mu.Lock()
198  	defer s.mu.Unlock()
199  
200  	if s.pid != 0 {
201  		return errors.New("pty session already started")
202  	}
203  	if s.closing {
204  		return errors.New("pty session is closing")
205  	}
206  
207  	stdinR, stdinW, err := os.Pipe()
208  	if err != nil {
209  		return fmt.Errorf("stdin pipe: %w", err)
210  	}
211  	stdoutR, stdoutW, err := os.Pipe()
212  	if err != nil {
213  		_ = stdinR.Close()
214  		_ = stdinW.Close()
215  		return fmt.Errorf("stdout pipe: %w", err)
216  	}
217  	stderrR, stderrW, err := os.Pipe()
218  	if err != nil {
219  		_ = stdinR.Close()
220  		_ = stdinW.Close()
221  		_ = stdoutR.Close()
222  		_ = stdoutW.Close()
223  		return fmt.Errorf("stderr pipe: %w", err)
224  	}
225  
226  	cmd := exec.Command("bash", "--norc", "--noprofile")
227  	cmd.Env = os.Environ()
228  	if s.cwd != "" {
229  		cmd.Dir = s.cwd
230  	}
231  	cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
232  	cmd.Stdin = stdinR
233  	cmd.Stdout = stdoutW
234  	cmd.Stderr = stderrW
235  
236  	if err := cmd.Start(); err != nil {
237  		_ = stdinR.Close()
238  		_ = stdinW.Close()
239  		_ = stdoutR.Close()
240  		_ = stdoutW.Close()
241  		_ = stderrR.Close()
242  		_ = stderrW.Close()
243  		return fmt.Errorf("cmd.Start: %w", err)
244  	}
245  
246  	// Close the child-side ends in the parent — the child has its own copies.
247  	_ = stdinR.Close()
248  	_ = stdoutW.Close()
249  	_ = stderrW.Close()
250  
251  	s.isPTY = false
252  	s.pid = cmd.Process.Pid
253  	s.doneCh = make(chan struct{})
254  	s.stdin = stdinW
255  
256  	safego.Go(func() { s.broadcastPipe(stdoutR, true) })
257  	safego.Go(func() { s.broadcastPipe(stderrR, false) })
258  	safego.Go(func() { s.waitAndExitPipe(cmd, stdinW, stdoutR, stderrR) })
259  
260  	return nil
261  }
262  
263  // broadcastPTY reads from the PTY master and fans out to replay + active WS client.
264  func (s *ptySession) broadcastPTY() {
265  	buf := make([]byte, 32*1024)
266  	for {
267  		n, err := s.ptmx.Read(buf)
268  		if n > 0 {
269  			s.writeAndFanout(buf[:n], true)
270  		}
271  		if err != nil {
272  			// EIO or EOF when the child exits — normal termination
273  			break
274  		}
275  	}
276  }
277  
278  // broadcastPipe reads from a pipe (stdout or stderr) and fans out to replay + active WS client.
279  func (s *ptySession) broadcastPipe(r *os.File, isStdout bool) {
280  	buf := make([]byte, 32*1024)
281  	for {
282  		n, err := r.Read(buf)
283  		if n > 0 {
284  			s.writeAndFanout(buf[:n], isStdout)
285  		}
286  		if err != nil {
287  			break
288  		}
289  	}
290  	_ = r.Close()
291  }
292  
293  // writeAndFanout writes chunk to the replay buffer and delivers it to the
294  // active per-connection pipe, atomically under outMu.
295  //
296  // Holding outMu across both operations closes the window where bytes written
297  // to replay after ReadFrom but before AttachOutput would be silently dropped.
298  // Lock order is always outMu → replay.mu (both paths), so no deadlock is possible.
299  func (s *ptySession) writeAndFanout(chunk []byte, isStdout bool) {
300  	s.outMu.Lock()
301  	s.replay.write(chunk) // acquires replay.mu inside (outMu → replay.mu)
302  	var w *io.PipeWriter
303  	if isStdout {
304  		w = s.stdoutW
305  	} else {
306  		w = s.stderrW
307  	}
308  	s.outMu.Unlock()
309  
310  	if w != nil {
311  		if _, err := w.Write(chunk); err != nil {
312  			// Pipe was closed (client detached) — ignore.
313  			log.Warning("pty fanout write: %v", err)
314  		}
315  	}
316  }
317  
318  // waitAndExit waits for the PTY-mode process and updates session state on exit.
319  func (s *ptySession) waitAndExit(cmd *exec.Cmd, ptmx *os.File) {
320  	_ = cmd.Wait()
321  
322  	// Close the PTY master to unblock the broadcast goroutine.
323  	_ = ptmx.Close()
324  
325  	s.mu.Lock()
326  	exitCode := 0
327  	if cmd.ProcessState != nil {
328  		exitCode = cmd.ProcessState.ExitCode()
329  	}
330  	s.lastExitCode = exitCode
331  	s.pid = 0
332  	doneCh := s.doneCh
333  	s.mu.Unlock()
334  
335  	close(doneCh)
336  }
337  
338  // waitAndExitPipe waits for the pipe-mode process and updates session state on exit.
339  func (s *ptySession) waitAndExitPipe(cmd *exec.Cmd, stdinW, stdoutR, stderrR *os.File) {
340  	_ = cmd.Wait()
341  
342  	// Close stdin write-end so the child (if still running) sees EOF.
343  	_ = stdinW.Close()
344  
345  	s.mu.Lock()
346  	exitCode := 0
347  	if cmd.ProcessState != nil {
348  		exitCode = cmd.ProcessState.ExitCode()
349  	}
350  	s.lastExitCode = exitCode
351  	s.pid = 0
352  	doneCh := s.doneCh
353  	s.mu.Unlock()
354  
355  	close(doneCh)
356  }
357  
358  // WriteStdin writes p to bash stdin (PTY master or pipe write-end).
359  func (s *ptySession) WriteStdin(p []byte) (int, error) {
360  	s.mu.Lock()
361  	w := s.stdin
362  	s.mu.Unlock()
363  	if w == nil {
364  		return 0, errors.New("session not started")
365  	}
366  	return w.Write(p)
367  }
368  
369  // AttachOutput creates a fresh per-connection io.Pipe and swaps it into the
370  // broadcast fanout path.
371  //
372  // Ordering guarantee (no duplicates on reconnect):
373  //   - Caller must snapshot the replay buffer BEFORE calling AttachOutput.
374  //   - Bytes produced between the snapshot and AttachOutput are delivered via
375  //     the live pipe only (not in the snapshot), so each byte arrives exactly once.
376  //
377  // Returns (stdout reader, stderr reader [nil in PTY mode], detach func).
378  // Calling detach() closes the writers, sending EOF to the readers and
379  // unblocking all pump goroutines.
380  func (s *ptySession) AttachOutput() (io.Reader, io.Reader, func()) {
381  	stdoutR, stdoutW := io.Pipe()
382  
383  	s.outMu.Lock()
384  	s.stdoutW = stdoutW
385  	s.outMu.Unlock()
386  
387  	if s.isPTY {
388  		detach := func() {
389  			s.outMu.Lock()
390  			s.stdoutW = nil
391  			s.outMu.Unlock()
392  			_ = stdoutW.Close()
393  		}
394  		return stdoutR, nil, detach
395  	}
396  
397  	// Pipe mode: also attach stderr.
398  	stderrR, stderrW := io.Pipe()
399  
400  	s.outMu.Lock()
401  	s.stderrW = stderrW
402  	s.outMu.Unlock()
403  
404  	detach := func() {
405  		s.outMu.Lock()
406  		s.stdoutW = nil
407  		s.stderrW = nil
408  		s.outMu.Unlock()
409  		_ = stdoutW.Close()
410  		_ = stderrW.Close()
411  	}
412  	return stdoutR, stderrR, detach
413  }
414  
415  // AttachOutputWithSnapshot atomically snapshots the replay buffer and attaches
416  // the per-connection output pipe, eliminating the output-loss window that exists
417  // when ReadFrom and AttachOutput are called separately.
418  //
419  // Must be used together with writeAndFanout (which holds outMu during both
420  // replay.write and the fanout pointer read).
421  //
422  // Lock order is always outMu → replay.mu (both paths), so no deadlock is possible.
423  //
424  // Returns (stdoutR, stderrR [nil in PTY mode], detach, snapshotBytes, snapshotOffset).
425  func (s *ptySession) AttachOutputWithSnapshot(since int64) (io.Reader, io.Reader, func(), []byte, int64) {
426  	stdoutR, stdoutW := io.Pipe()
427  	var stderrR io.Reader
428  	var stderrW *io.PipeWriter
429  	if !s.isPTY {
430  		stderrR, stderrW = io.Pipe()
431  	}
432  
433  	s.outMu.Lock()
434  	snapshotBytes, snapshotOffset := s.replay.ReadFrom(since) // acquires replay.mu inside
435  	s.stdoutW = stdoutW
436  	if stderrW != nil {
437  		s.stderrW = stderrW
438  	}
439  	s.outMu.Unlock()
440  
441  	detach := func() {
442  		s.outMu.Lock()
443  		s.stdoutW = nil
444  		if stderrW != nil {
445  			s.stderrW = nil
446  		}
447  		s.outMu.Unlock()
448  		_ = stdoutW.Close()
449  		if stderrW != nil {
450  			_ = stderrW.Close()
451  		}
452  	}
453  	return stdoutR, stderrR, detach, snapshotBytes, snapshotOffset
454  }
455  
456  // SendSignal sends the named signal to the process group.
457  // Recognised names: SIGINT, SIGTERM, SIGKILL, SIGQUIT, SIGHUP.
458  func (s *ptySession) SendSignal(name string) {
459  	s.mu.Lock()
460  	pid := s.pid
461  	s.mu.Unlock()
462  	if pid == 0 {
463  		return
464  	}
465  
466  	sig := parseSignalName(name)
467  	if sig == 0 {
468  		log.Warning("ptySession.SendSignal: unknown signal %q", name)
469  		return
470  	}
471  
472  	// In PTY mode (setsid), pgid == pid automatically.
473  	// In pipe mode (Setpgid), pgid is also == pid.
474  	// Either way, Kill(-pid, sig) sends to the process group.
475  	if err := syscall.Kill(-pid, sig); err != nil {
476  		log.Warning("ptySession.SendSignal kill(-%d, %v): %v", pid, sig, err)
477  	}
478  }
479  
480  func parseSignalName(name string) syscall.Signal {
481  	switch name {
482  	case "SIGINT":
483  		return syscall.SIGINT
484  	case "SIGTERM":
485  		return syscall.SIGTERM
486  	case "SIGKILL":
487  		return syscall.SIGKILL
488  	case "SIGQUIT":
489  		return syscall.SIGQUIT
490  	case "SIGHUP":
491  		return syscall.SIGHUP
492  	default:
493  		return 0
494  	}
495  }
496  
497  // ResizePTY updates the terminal window size (PTY mode only; no-op in pipe mode).
498  func (s *ptySession) ResizePTY(cols, rows uint16) error {
499  	s.mu.Lock()
500  	ptmx := s.ptmx
501  	s.mu.Unlock()
502  	if ptmx == nil {
503  		return nil // pipe mode or not started
504  	}
505  	return pty.Setsize(ptmx, &pty.Winsize{Cols: cols, Rows: rows})
506  }
507  
508  // close terminates the session and releases all resources.
509  // Safe to call multiple times.
510  func (s *ptySession) close() {
511  	s.mu.Lock()
512  	if s.closing {
513  		s.mu.Unlock()
514  		return
515  	}
516  	s.closing = true
517  	pid := s.pid
518  	ptmx := s.ptmx
519  	stdin := s.stdin
520  	s.mu.Unlock()
521  
522  	if pid != 0 {
523  		_ = syscall.Kill(-pid, syscall.SIGKILL)
524  	}
525  	if ptmx != nil {
526  		_ = ptmx.Close()
527  	} else if stdin != nil {
528  		_ = stdin.Close()
529  	}
530  
531  	// Detach any active WS output pipe so pump goroutines unblock.
532  	s.outMu.Lock()
533  	stdoutW := s.stdoutW
534  	stderrW := s.stderrW
535  	s.stdoutW = nil
536  	s.stderrW = nil
537  	s.outMu.Unlock()
538  	if stdoutW != nil {
539  		_ = stdoutW.Close()
540  	}
541  	if stderrW != nil {
542  		_ = stderrW.Close()
543  	}
544  }
545  
546  // CreatePTYSession creates a new PTY session and stores it in the map.
547  func (c *Controller) CreatePTYSession(id, cwd string) (PTYSession, error) {
548  	resolvedCwd, err := pathutil.ExpandPath(cwd)
549  	if err != nil {
550  		return nil, fmt.Errorf("error resolving PTY session work directory: %w", err)
551  	}
552  	if resolvedCwd != "" {
553  		err := os.MkdirAll(resolvedCwd, os.ModePerm)
554  		if err != nil {
555  			return nil, fmt.Errorf("error creating PTY session work directory: %w", err)
556  		}
557  	}
558  	s := newPTYSession(id, resolvedCwd)
559  	c.ptySessionMap.Store(id, s)
560  	log.Info("created pty session %s", id)
561  	return s, nil
562  }
563  
564  // getPTYSession looks up a PTY session by ID. Returns nil if not found.
565  // For internal use only; outside callers should use GetPTYSession.
566  func (c *Controller) getPTYSession(id string) *ptySession {
567  	if v, ok := c.ptySessionMap.Load(id); ok {
568  		if s, ok := v.(*ptySession); ok {
569  			return s
570  		}
571  	}
572  	return nil
573  }
574  
575  // GetPTYSession looks up a PTY session by ID. Returns nil if not found.
576  func (c *Controller) GetPTYSession(id string) PTYSession {
577  	s := c.getPTYSession(id)
578  	if s == nil {
579  		return nil
580  	}
581  	return s
582  }
583  
584  // DeletePTYSession terminates and removes a PTY session.
585  // Returns ErrContextNotFound if the session does not exist.
586  func (c *Controller) DeletePTYSession(id string) error {
587  	s := c.getPTYSession(id)
588  	if s == nil {
589  		return ErrContextNotFound
590  	}
591  	s.close()
592  	c.ptySessionMap.Delete(id)
593  	log.Info("deleted pty session %s", id)
594  	return nil
595  }
596  
597  // GetPTYSessionStatus returns status information for a PTY session.
598  func (c *Controller) GetPTYSessionStatus(id string) (running bool, outputOffset int64, err error) {
599  	s := c.getPTYSession(id)
600  	if s == nil {
601  		return false, 0, ErrContextNotFound
602  	}
603  	return s.IsRunning(), s.replay.Total(), nil
604  }