/ src / core / sessions.py
sessions.py
  1  """
  2  Session directory management for Ag3ntum.
  3  
  4  Handles session directory creation and workspace setup.
  5  Each session has an isolated workspace with:
  6  - agent.jsonl - Claude SDK event log
  7  - workspace/ - Agent working directory with external mounts
  8  
  9  NOTE: All session metadata is stored in SQLite database (Session model),
 10  NOT in files. This module only manages directory structure.
 11  
 12  SECURITY: Session directories use ag3ntum group ownership model:
 13  - Permissions: 770 dirs / 660 files (owner + group, no world access)
 14  - Owner UID: sandbox user
 15  - Owner GID: ag3ntum group (always exists, all sandbox users are members)
 16  - PathValidator provides application-level cross-session/cross-user isolation
 17  """
 18  import grp
 19  import json
 20  import logging
 21  import os
 22  import shutil
 23  import subprocess
 24  import uuid
 25  from datetime import datetime
 26  from pathlib import Path
 27  from typing import Optional
 28  
 29  from .exceptions import DynamicMountError, SessionError
 30  from .path_validator import get_session_linux_uid
 31  
 32  logger = logging.getLogger(__name__)
 33  
 34  AG3NTUM_GROUP = "ag3ntum"
 35  
 36  
 37  def _get_ag3ntum_gid() -> int:
 38      """Return the GID of the ag3ntum group.
 39  
 40      The ag3ntum group is created by the Dockerfile and always exists.
 41      All sandbox users and ag3ntum_api are members, so using it as the
 42      group owner removes the fragile dependency on per-user groups.
 43      """
 44      return grp.getgrnam(AG3NTUM_GROUP).gr_gid
 45  
 46  
 47  def _sudo_chown(path: Path, uid: int) -> None:
 48      """
 49      Set ownership of a path using sudo chown.
 50  
 51      Sets owner to the sandbox user's UID and group to ag3ntum.
 52      The ag3ntum group (GID 1001) always exists (created by Dockerfile)
 53      and both ag3ntum_api and all sandbox users are members.
 54  
 55      Args:
 56          path: Path to change ownership of
 57          uid: UID to set as owner
 58      """
 59      try:
 60          result = subprocess.run(
 61              ["sudo", "/usr/bin/chown", f"{uid}:{AG3NTUM_GROUP}", str(path)],
 62              capture_output=True,
 63              text=True,
 64              timeout=10,
 65          )
 66          if result.returncode != 0:
 67              logger.warning(f"sudo chown failed for {path}: {result.stderr.strip()}")
 68      except subprocess.TimeoutExpired:
 69          logger.warning(f"sudo chown timed out for {path}")
 70      except Exception as e:
 71          logger.warning(f"Could not set ownership of {path} to {uid}: {e}")
 72  
 73  
 74  def _apply_shared_ownership(path: Path, owner_uid: Optional[int]) -> None:
 75      """Set owner to the session UID and group to ag3ntum.
 76  
 77      If direct chown fails under /users/, fall back to the controlled sudo path
 78      used elsewhere for session setup. This keeps session artifacts readable by
 79      both the sandbox user and the API process under the shared-GID model.
 80      """
 81      if owner_uid is None:
 82          return
 83  
 84      try:
 85          os.chown(path, owner_uid, _get_ag3ntum_gid())
 86          return
 87      except OSError as e:
 88          if str(path).startswith("/users/"):
 89              _sudo_chown(path, owner_uid)
 90              return
 91          logger.warning(f"Could not set ownership of {path} to {owner_uid}: {e}")
 92  
 93  
 94  def chown_to_session_user(path: Path, session_id: str) -> None:
 95      """
 96      Change ownership of a file/directory to the session's sandbox user.
 97  
 98      Called by Write/Edit/MultiEdit MCP tools after creating/modifying files.
 99      Sets UID to the sandbox user and GID to the ag3ntum group.
100      Since ag3ntum_api and all sandbox users are in the ag3ntum group,
101      both the API process and Bash sandbox can access files with 660/770 perms.
102  
103      Uses sudo chown for paths under /users/*. Silently skips for
104      other paths (e.g., /mounts/*) where ownership is managed by the host.
105  
106      Args:
107          path: Path to change ownership of
108          session_id: Session ID to look up the linux_uid
109      """
110      linux_uid = get_session_linux_uid(session_id)
111      if linux_uid is None:
112          return
113  
114      if str(path).startswith("/users/"):
115          _sudo_chown(path, linux_uid)
116  
117  
118  def sudo_chown_recursive(path: Path, uid: int) -> None:
119      """
120      Recursively set ownership of a path using sudo chown -R.
121  
122      Sets owner to the sandbox user's UID and group to ag3ntum.
123  
124      Args:
125          path: Path to change ownership of (recursively)
126          uid: UID to set as owner
127      """
128      try:
129          result = subprocess.run(
130              ["sudo", "/usr/bin/chown", "-R", f"{uid}:{AG3NTUM_GROUP}", str(path)],
131              capture_output=True,
132              text=True,
133              timeout=30,  # Longer timeout for recursive operation
134          )
135          if result.returncode != 0:
136              logger.warning(f"sudo chown -R failed for {path}: {result.stderr.strip()}")
137          else:
138              logger.info(f"Set ownership of {path} (recursive) to {uid}:{AG3NTUM_GROUP}")
139      except subprocess.TimeoutExpired:
140          logger.warning(f"sudo chown -R timed out for {path}")
141      except Exception as e:
142          logger.warning(f"Could not set ownership of {path} to {uid}: {e}")
143  
144  
145  class SessionManager:
146      """
147      Manages session directories and file paths.
148  
149      NOTE: Session metadata (status, cumulative stats, etc.) is stored in
150      the SQLite database, NOT in files. This class only handles:
151      - Directory structure creation
152      - File path resolution
153      - External mount symlinks
154      - Workspace cleanup
155  
156      The session directory contains:
157      - agent.jsonl - SDK event log (required by Claude SDK)
158      - workspace/ - Agent working directory
159      """
160  
161      def __init__(self, sessions_dir: Path) -> None:
162          """
163          Initialize the session manager.
164  
165          Args:
166              sessions_dir: Directory to store sessions.
167          """
168          self._sessions_dir = sessions_dir
169          # Note: Directory is created on-demand in create_session_directory(), not here.
170          # This allows lazy initialization for per-user session directories.
171  
172      def create_session_directory(
173          self,
174          session_id: str | None = None,
175          owner_uid: Optional[int] = None,
176      ) -> str:
177          """
178          Create the directory structure for a new session with shared access.
179  
180          Session directories use 770 permissions to allow both API (via sudo bwrap)
181          and sandbox processes to access files. Cross-session isolation is enforced
182          at the application level by PathValidator.
183  
184          Security Properties:
185          - Permissions: 770 (owner and group can read/write/execute)
186          - Owner: sandbox user's UID (owner_uid) if provided
187          - Group: same as owner UID (sandbox user's primary group)
188          - API accesses via sudo bwrap (root), sandbox runs as owner_uid
189          - Cross-user isolation: PathValidator blocks cross-user access
190          - Cross-session isolation: separate directories + PathValidator
191  
192          Args:
193              session_id: Optional session ID. If None, generates one.
194              owner_uid: UID to set as owner (for sandbox access).
195                         If None, directories owned by API process.
196  
197          Returns:
198              The session ID (generated or provided).
199          """
200          if session_id is None:
201              session_id = generate_session_id()
202  
203          session_dir = self.get_session_dir(session_id)
204  
205          # Create session directory with 770 permissions (owner + group access)
206          # This allows both API (via sudo) and sandbox (as owner) to access
207          self._create_session_directory_basic(session_dir, owner_uid)
208  
209          logger.info(f"Created session directory: {session_id} (owner_uid={owner_uid})")
210          return session_id
211  
212      def _create_session_directory_basic(
213          self,
214          session_dir: Path,
215          owner_uid: Optional[int] = None,
216      ) -> None:
217          """
218          Create session directory with 770 permissions (owner + group access).
219  
220          The 770 permissions allow:
221          - Owner (sandbox user): full read/write/execute access
222          - Group (sandbox user's group): full read/write/execute access
223          - API process: accesses via sudo bwrap (runs as root initially)
224  
225          This is necessary because:
226          - API creates directories but can't chown without CAP_CHOWN
227          - Sandbox (bubblewrap) uses sudo to gain root for file access
228          - Files created by sandbox commands are owned by sandbox user
229          - API needs to read these files for File Browser, etc.
230  
231          Args:
232              session_dir: Path to the session directory
233              owner_uid: Optional UID to set as owner (also used as GID)
234          """
235          session_dir.mkdir(parents=True, exist_ok=True)
236          try:
237              session_dir.chmod(0o770)
238          except PermissionError:
239              # Directory already exists and is owned by sandbox user - that's OK
240              pass
241  
242          workspace = session_dir / "workspace"
243          workspace.mkdir(exist_ok=True)
244          try:
245              # 777 permissions allow both API and sandbox to read/write
246              # Security is enforced by PathValidator and bubblewrap sandbox, not by permissions
247              workspace.chmod(0o777)
248          except PermissionError:
249              # Directory already exists and is owned by sandbox user - that's OK
250              pass
251  
252          # Note: Don't chown here - ownership is set after all directories are
253          # created (in setup_external_mounts) to avoid permission issues where
254          # the API can't create subdirectories after losing ownership
255  
256      def get_session_dir(self, session_id: str) -> Path:
257          """
258          Get the directory for a session.
259  
260          Args:
261              session_id: The session ID.
262  
263          Returns:
264              Path to the session directory.
265          """
266          return self._sessions_dir / session_id
267  
268      def get_log_file(self, session_id: str) -> Path:
269          """
270          Get the log file path for a session.
271  
272          Args:
273              session_id: The session ID.
274  
275          Returns:
276              Path to the agent.jsonl file.
277          """
278          return self.get_session_dir(session_id) / "agent.jsonl"
279  
280      def get_workspace_dir(self, session_id: str) -> Path:
281          """
282          Get the workspace directory for a session.
283  
284          The workspace is a sandboxed subdirectory where the agent can
285          write output. This is separate from the session directory to
286          prevent the agent from reading logs and other sensitive files.
287  
288          Args:
289              session_id: The session ID.
290  
291          Returns:
292              Path to the workspace directory.
293          """
294          workspace = self.get_session_dir(session_id) / "workspace"
295          workspace.mkdir(parents=True, exist_ok=True)
296          return workspace
297  
298      def setup_external_mounts(
299          self, session_id: str, username: str, owner_uid: Optional[int] = None
300      ) -> None:
301          """
302          Create symlinks for external mounts in the workspace.
303  
304          Creates the ./external/ directory structure with symlinks to:
305          - ./external/ro/{name} -> /mounts/{name} (global read-only mounts)
306          - ./external/rw/{name} -> /mounts/{name} (global read-write mounts)
307          - ./external/user-ro/{name} -> /mounts/{name} (per-user read-only mounts)
308          - ./external/user-rw/{name} -> /mounts/{name} (per-user read-write mounts)
309  
310          Also creates persistent storage symlink:
311          - ./persistent -> /persistent (sandbox mount target)
312  
313          This allows both the File Browser UI and agent tools to see the same files.
314  
315          Args:
316              session_id: The session ID.
317              username: The username for persistent storage path and per-user mounts.
318              owner_uid: UID to set as owner (for sandbox access). If None, uses default.
319          """
320          import yaml
321  
322          workspace = self.get_workspace_dir(session_id)
323          external_dir = workspace / "external"
324  
325          # Create base directories with 770 permissions for sandbox access
326          dirs_to_create = [
327              external_dir,
328              external_dir / "ro",
329              external_dir / "rw",
330              external_dir / "user-ro",
331              external_dir / "user-rw",
332          ]
333          for dir_path in dirs_to_create:
334              dir_path.mkdir(parents=True, exist_ok=True)
335              # 777 permissions allow both API and sandbox to read/write
336              # Security is enforced by PathValidator and bubblewrap sandbox
337              dir_path.chmod(0o777)
338  
339          # Load global mounts configuration from auto-generated-mounts.yaml (generated by run.sh)
340          mounts_file = Path("/auto-generated/auto-generated-mounts.yaml")
341  
342          if mounts_file.exists():
343              try:
344                  with open(mounts_file, "r", encoding="utf-8") as f:
345                      manifest = yaml.safe_load(f) or {}
346  
347                  mounts_data = manifest.get("mounts", {})
348  
349                  # Create RO mount symlinks (flattened: /mounts/{name})
350                  if isinstance(mounts_data.get("ro"), list):
351                      for mount in mounts_data["ro"]:
352                          if isinstance(mount, dict) and mount.get("name"):
353                              name = mount["name"]
354                              link = external_dir / "ro" / name
355                              # Use container_path from manifest if available, else flat path
356                              container_path = mount.get("container_path", f"/mounts/{name}")
357                              target = Path(container_path)
358  
359                              # Skip if mount doesn't exist in Docker
360                              if not target.exists():
361                                  logger.debug(f"Skipping RO mount '{name}': {target} does not exist")
362                                  continue
363  
364                              # Skip if symlink already exists and is valid
365                              if link.is_symlink() and link.exists():
366                                  continue
367  
368                              # Remove broken symlink if present
369                              if link.is_symlink():
370                                  try:
371                                      link.unlink()
372                                  except OSError as e:
373                                      logger.warning(f"Failed to remove broken RO symlink {name}: {e}")
374                                      continue
375  
376                              # Skip if regular file/directory exists
377                              if link.exists():
378                                  continue
379  
380                              try:
381                                  link.symlink_to(target)
382                                  logger.debug(f"Created RO mount symlink: {link} -> {target}")
383                              except OSError as e:
384                                  logger.warning(f"Failed to create RO symlink for {name}: {e}")
385  
386                  # Create RW mount symlinks (flattened: /mounts/{name})
387                  if isinstance(mounts_data.get("rw"), list):
388                      for mount in mounts_data["rw"]:
389                          if isinstance(mount, dict) and mount.get("name"):
390                              name = mount["name"]
391                              link = external_dir / "rw" / name
392                              # Use container_path from manifest if available, else flat path
393                              container_path = mount.get("container_path", f"/mounts/{name}")
394                              target = Path(container_path)
395  
396                              # Skip if mount doesn't exist in Docker
397                              if not target.exists():
398                                  logger.debug(f"Skipping RW mount '{name}': {target} does not exist")
399                                  continue
400  
401                              # Skip if symlink already exists and is valid
402                              if link.is_symlink() and link.exists():
403                                  continue
404  
405                              # Remove broken symlink if present
406                              if link.is_symlink():
407                                  try:
408                                      link.unlink()
409                                  except OSError as e:
410                                      logger.warning(f"Failed to remove broken RW symlink {name}: {e}")
411                                      continue
412  
413                              # Skip if regular file/directory exists
414                              if link.exists():
415                                  continue
416  
417                              try:
418                                  link.symlink_to(target)
419                                  logger.debug(f"Created RW mount symlink: {link} -> {target}")
420                              except OSError as e:
421                                  logger.warning(f"Failed to create RW symlink for {name}: {e}")
422  
423              except Exception as e:
424                  logger.warning(f"Failed to load mounts config: {e}")
425  
426          # Load per-user mounts from external-mounts.yaml
427          from ..services.mount_service import get_user_mounts
428  
429          try:
430              user_mounts = get_user_mounts(username)
431  
432              # Create per-user RO mount symlinks (flattened: /mounts/{name})
433              for mount_info in user_mounts.get("ro", []):
434                  name = mount_info["name"]
435                  link = external_dir / "user-ro" / name
436                  # Use container_path from mount_info if available, else flat path
437                  container_path = mount_info.get("container_path", f"/mounts/{name}")
438                  target = Path(container_path)
439  
440                  # Skip if mount doesn't exist and is required
441                  if not target.exists():
442                      is_optional = mount_info.get("optional", True)
443                      if not is_optional:
444                          logger.warning(f"Required per-user RO mount missing: {target}")
445                      continue
446  
447                  if not link.exists() and not link.is_symlink():
448                      try:
449                          link.symlink_to(target)
450                          logger.debug(f"Created user RO mount symlink: {link} -> {target}")
451                      except OSError as e:
452                          logger.warning(f"Failed to create user RO symlink for {name}: {e}")
453  
454              # Create per-user RW mount symlinks (flattened: /mounts/{name})
455              for mount_info in user_mounts.get("rw", []):
456                  name = mount_info["name"]
457                  link = external_dir / "user-rw" / name
458                  # Use container_path from mount_info if available, else flat path
459                  container_path = mount_info.get("container_path", f"/mounts/{name}")
460                  target = Path(container_path)
461  
462                  # Skip if mount doesn't exist and is required
463                  if not target.exists():
464                      is_optional = mount_info.get("optional", True)
465                      if not is_optional:
466                          logger.warning(f"Required per-user RW mount missing: {target}")
467                      continue
468  
469                  if not link.exists() and not link.is_symlink():
470                      try:
471                          link.symlink_to(target)
472                          logger.debug(f"Created user RW mount symlink: {link} -> {target}")
473                      except OSError as e:
474                          logger.warning(f"Failed to create user RW symlink for {name}: {e}")
475  
476          except Exception as e:
477              logger.warning(f"Failed to load per-user mounts: {e}")
478  
479          # Create persistent storage symlink at workspace root (not under external/)
480          # The persistent directory is at {user_home}/ag3ntum/persistent in Docker.
481          # The symlink points to the Docker path so that ALL tools work correctly:
482          #   - LS/Glob/Grep: traverse directories without broken symlinks
483          #   - Read/Write/Edit: PathValidator translates to Docker path (same result)
484          #   - Bash (bwrap): agent_core.py adds a bwrap mount at the Docker path
485          #     so the symlink resolves inside the sandbox too
486          # This follows the same pattern as external mount symlinks which point
487          # to Docker paths (/mounts/{name}), not sandbox paths.
488          persistent_link = workspace / "persistent"
489          user_home = self._sessions_dir.parent  # e.g., /users/{username}/sessions -> /users/{username}
490          persistent_dir_host = user_home / "ag3ntum" / "persistent"  # Docker path (symlink target)
491  
492          # Ensure the persistent directory exists
493          # FAIL FAST: If we can't create it, the session should fail - not silently skip
494          if not persistent_dir_host.exists():
495              try:
496                  persistent_dir_host.mkdir(parents=True, exist_ok=True)
497                  logger.info(f"Created persistent storage directory: {persistent_dir_host}")
498              except OSError as e:
499                  raise SessionError(
500                      f"Failed to create persistent storage directory {persistent_dir_host}: {e}. "
501                      "This directory is required for bwrap sandbox mounts."
502                  )
503  
504          # Create or fix the symlink (points to Docker path)
505          try:
506              if persistent_link.is_symlink():
507                  current_target = persistent_link.readlink()
508                  if current_target != persistent_dir_host:
509                      persistent_link.unlink()
510                      persistent_link.symlink_to(persistent_dir_host)
511                      logger.debug(f"Fixed persistent symlink: {persistent_link} -> {persistent_dir_host}")
512              elif not persistent_link.exists():
513                  persistent_link.symlink_to(persistent_dir_host)
514                  logger.debug(f"Created persistent symlink: {persistent_link} -> {persistent_dir_host}")
515          except OSError as e:
516              raise SessionError(
517                  f"Failed to create persistent storage symlink: {e}"
518              )
519  
520          # Note: We use 777 permissions instead of chown to allow both API and sandbox access
521          # Security is enforced by PathValidator (blocks cross-session access) and
522          # bubblewrap sandbox (isolates subprocess execution)
523  
524          logger.info(f"Set up external mounts for session {session_id}")
525  
526      def setup_dynamic_mounts(
527          self,
528          session_id: str,
529          username: str,
530          mount_requests: list,
531          owner_uid: Optional[int] = None,
532      ) -> list:
533          """
534          Set up dynamic mounts for a session.
535  
536          Creates symlinks at workspace root (e.g., workspace/{alias}) pointing to
537          container mount paths (/mounts/{base}/{subpath}).
538          Returns list of mount info for PathValidator and Bubblewrap configuration.
539  
540          Args:
541              session_id: The session ID.
542              username: The username (from JWT token) for authorization.
543              mount_requests: List of DynamicMountRequest objects.
544              owner_uid: UID to set as symlink owner.
545  
546          Returns:
547              List of DynamicMountInfo objects describing the mounted paths.
548  
549          Raises:
550              DynamicMountError: If mount validation fails or setup encounters an error.
551          """
552          from src.api.models import DynamicMountInfo, DynamicMountRequest
553          from src.services.mount_service import get_dynamic_mount_service
554  
555          workspace = self.get_workspace_dir(session_id)
556  
557          # Reserved workspace paths that cannot be used as mount aliases
558          reserved_paths = {"external", "persistent", ".claude", "output.yaml"}
559  
560          mount_service = get_dynamic_mount_service()
561          max_mounts = mount_service.security.get("max_mounts_per_session", 10)
562  
563          if len(mount_requests) > max_mounts:
564              raise DynamicMountError(
565                  f"Too many mounts requested ({len(mount_requests)}). Maximum: {max_mounts}"
566              )
567  
568          mounted: list[DynamicMountInfo] = []
569          seen_aliases: set[str] = set()
570  
571          for request in mount_requests:
572              # Check for duplicate aliases
573              if request.alias in seen_aliases:
574                  raise DynamicMountError(f"Duplicate mount alias: {request.alias}")
575              seen_aliases.add(request.alias)
576  
577              # Check for reserved paths
578              if request.alias in reserved_paths:
579                  raise DynamicMountError(
580                      f"Mount alias '{request.alias}' is reserved and cannot be used"
581                  )
582  
583              # Validate mount request
584              validation = mount_service.validate_mount_request(request, username)
585  
586              if not validation.is_valid:
587                  logger.warning(
588                      f"DYNAMIC_MOUNT_DENIED: session={session_id}, user={username}, "
589                      f"base={request.base}, subpath={request.subpath}, "
590                      f"reason={validation.denial_code}"
591                  )
592                  raise DynamicMountError(
593                      f"Mount '{request.alias}' denied: {validation.error}"
594                  )
595  
596              # Create symlink at workspace root (same level as external/, persistent/)
597              link_path = workspace / request.alias
598              target_path = Path(validation.resolved_container_path)
599  
600              if link_path.exists() or link_path.is_symlink():
601                  raise DynamicMountError(f"Mount alias '{request.alias}' already exists")
602  
603              try:
604                  link_path.symlink_to(target_path)
605              except OSError as e:
606                  raise DynamicMountError(
607                      f"Failed to create symlink for '{request.alias}': {e}"
608                  )
609  
610              # Set ownership if provided (use lchown for symlinks)
611              if owner_uid is not None:
612                  try:
613                      os.lchown(link_path, owner_uid, _get_ag3ntum_gid())
614                  except OSError as e:
615                      logger.warning(f"Could not set ownership of {link_path}: {e}")
616  
617              logger.info(
618                  f"DYNAMIC_MOUNT_CREATED: session={session_id}, user={username}, "
619                  f"alias={request.alias}, target={target_path}, mode={validation.resolved_mode}"
620              )
621  
622              # Resolve host_path for display (base host_path + subpath)
623              base_obj = mount_service.bases.get(request.base)
624              host_path = None
625              if base_obj:
626                  hp = base_obj.host_path.replace("{username}", username)
627                  if request.subpath:
628                      hp = f"{hp}/{request.subpath}"
629                  host_path = hp
630  
631              mounted.append(DynamicMountInfo(
632                  alias=request.alias,
633                  workspace_path=f"./{request.alias}",
634                  mode=validation.resolved_mode,
635                  source_base=request.base,
636                  source_subpath=request.subpath,
637                  host_path=host_path,
638              ))
639  
640          # Persist dynamic mount metadata for File Explorer
641          session_dir = self.get_session_dir(session_id)
642          meta_file = session_dir / ".dynamic-mounts.json"
643          meta = {
644              m.alias: {
645                  "mode": m.mode,
646                  "source_base": m.source_base,
647                  "source_subpath": m.source_subpath,
648                  "host_path": m.host_path,
649              }
650              for m in mounted
651          }
652          try:
653              meta_file.write_text(json.dumps(meta))
654          except OSError as e:
655              logger.warning(
656                  f"Failed to write dynamic mount metadata for session {session_id}: {e}"
657              )
658  
659          logger.info(
660              f"Set up {len(mounted)} dynamic mounts for session {session_id}"
661          )
662          return mounted
663  
664      def load_dynamic_mount_info(self, session_id: str) -> list:
665          """
666          Reload persisted dynamic mount metadata from a session directory.
667  
668          Used when resuming a session to reconstruct DynamicMountInfo objects
669          without re-creating symlinks (they already exist from the original run).
670  
671          Args:
672              session_id: The session ID.
673  
674          Returns:
675              List of DynamicMountInfo objects, or empty list if no metadata exists.
676          """
677          from src.api.models import DynamicMountInfo
678  
679          session_dir = self.get_session_dir(session_id)
680          meta_file = session_dir / ".dynamic-mounts.json"
681  
682          if not meta_file.exists():
683              return []
684  
685          try:
686              raw = json.loads(meta_file.read_text())
687          except (json.JSONDecodeError, OSError) as e:
688              logger.warning(
689                  f"Failed to read dynamic mount metadata for session {session_id}: {e}"
690              )
691              return []
692  
693          if not isinstance(raw, dict) or not raw:
694              return []
695  
696          mounts = []
697          for alias, info in raw.items():
698              if not isinstance(info, dict):
699                  continue
700              try:
701                  mounts.append(DynamicMountInfo(
702                      alias=alias,
703                      workspace_path=f"./{alias}",
704                      mode=info.get("mode", "ro"),
705                      source_base=info.get("source_base", ""),
706                      source_subpath=info.get("source_subpath"),
707                      host_path=info.get("host_path"),
708                  ))
709              except Exception as e:
710                  logger.warning(
711                      f"Skipping invalid dynamic mount entry '{alias}' "
712                      f"for session {session_id}: {e}"
713                  )
714  
715          if mounts:
716              logger.info(
717                  f"Reloaded {len(mounts)} dynamic mounts for session {session_id}"
718              )
719          return mounts
720  
721      def get_original_path_mounts(
722          self,
723          username: str,
724      ) -> list:
725          """
726          Get original-path mounts available to a user.
727  
728          Original-path mounts allow accessing paths like /var/log at their
729          original locations within the sandbox. These are configured in
730          external-mounts.yaml under original_paths.
731  
732          Args:
733              username: The username (from JWT token) for authorization.
734  
735          Returns:
736              List of OriginalPathMount objects describing the available mounts.
737          """
738          from src.services.mount_service import (
739              get_original_path_mount_service,
740              OriginalPathMount,
741          )
742  
743          mount_service = get_original_path_mount_service()
744          return mount_service.get_mounts_for_user(username)
745  
746      def cleanup_workspace_skills(self, session_id: str) -> None:
747          """
748          Remove the skills folder from a session's workspace.
749  
750          Called after agent run completes to clean up merged skills symlinks.
751          The .claude/skills/ directory contains symlinks to actual skill sources.
752          Workspace files are preserved.
753  
754          Args:
755              session_id: The session ID.
756          """
757          # New structure: .claude/skills/ contains symlinks
758          claude_skills_dir = (
759              self.get_session_dir(session_id) / "workspace" / ".claude" / "skills"
760          )
761  
762          if claude_skills_dir.exists():
763              try:
764                  # Remove the directory with all its symlinks
765                  shutil.rmtree(claude_skills_dir)
766                  logger.info(
767                      f"Cleaned up workspace/.claude/skills/ for session {session_id}"
768                  )
769              except Exception as e:
770                  logger.warning(
771                      f"Failed to cleanup workspace skills for session {session_id}: {e}"
772                  )
773  
774          # Also clean old-style skills/ symlink for backward compatibility
775          old_skills_link = self.get_session_dir(session_id) / "workspace" / "skills"
776          if old_skills_link.is_symlink():
777              try:
778                  old_skills_link.unlink()
779                  logger.debug(f"Removed legacy skills symlink for session {session_id}")
780              except Exception as e:
781                  logger.warning(f"Failed to remove legacy skills symlink: {e}")
782  
783      def session_dir_exists(self, session_id: str) -> bool:
784          """
785          Check if a session directory exists.
786  
787          Args:
788              session_id: The session ID.
789  
790          Returns:
791              True if the session directory exists, False otherwise.
792          """
793          return self.get_session_dir(session_id).exists()
794  
795  
796  def generate_session_id() -> str:
797      """Generate a unique session ID."""
798      ts = datetime.now().strftime("%Y%m%d_%H%M%S")
799      uid = uuid.uuid4().hex[:8]
800      return f"{ts}_{uid}"
801  
802  
803  def secure_file_write(
804      file_path: Path,
805      content: bytes | str,
806      owner_uid: Optional[int] = None,
807      mode: int = 0o660,
808  ) -> None:
809      """
810      Write a file with shared access permissions.
811  
812      This function is used for writing files in session directories
813      (like agent.jsonl) with permissions that allow both owner and group access.
814  
815      Security Properties:
816      - File permissions: 660 by default (owner + group read/write)
817      - No world access
818      - Ownership: owner_uid:ag3ntum if provided
819  
820      Args:
821          file_path: Path to write to
822          content: Content to write (bytes or str)
823          owner_uid: UID to set as owner (optional, group is always ag3ntum)
824          mode: File permission mode (default: 660 = owner + group read/write)
825      """
826      # Write content
827      if isinstance(content, str):
828          file_path.write_text(content)
829      else:
830          file_path.write_bytes(content)
831  
832      # Set permissions BEFORE ownership (in case we lose access after chown)
833      file_path.chmod(mode)
834  
835      # Set ownership if provided (owner=sandbox user, group=ag3ntum)
836      _apply_shared_ownership(file_path, owner_uid)
837  
838  
839  def _secure_path(path: Path, mode: int, owner_uid: Optional[int]) -> None:
840      """Helper to set permissions and ownership on a path."""
841      try:
842          path.chmod(mode)
843          _apply_shared_ownership(path, owner_uid)
844      except OSError:
845          pass
846  
847  
848  def ensure_secure_session_files(
849      session_dir: Path,
850      owner_uid: Optional[int] = None,
851  ) -> None:
852      """
853      Ensure all session files have appropriate permissions after agent run.
854  
855      Session files use shared access permissions (770/660) to allow both
856      API and sandbox processes to access them. The ag3ntum_api process is
857      in the sandbox user's primary group (shared GID model, set at user
858      creation), so 660/770 is sufficient — no world-readable permissions needed.
859  
860      Cross-session isolation is enforced by PathValidator at the application level.
861  
862      Security Properties:
863      - Directories: 770 (owner rwx + group rwx, no world)
864      - Files: 660 (owner rw + group rw, no world)
865      - Sensitive files (agent.jsonl, .claude.json): 660
866      - Ownership: uid:ag3ntum (sandbox user UID, ag3ntum GID)
867  
868      Args:
869          session_dir: Path to the session directory
870          owner_uid: UID to set as owner (optional)
871      """
872      if not session_dir.exists():
873          return
874  
875      # Recursively secure the entire session tree. This covers:
876      # - root session metadata (.claude.json, agent.jsonl)
877      # - workspace artifacts created by tools
878      # - Claude project JSONL resume files under projects/
879      # - debug/todos directories created by the SDK wrapper
880      #
881      # Symlinks are skipped to avoid mutating external mount targets.
882      for root, dirs, files in os.walk(session_dir, followlinks=False):
883          root_path = Path(root)
884          _secure_path(root_path, 0o770, owner_uid)
885  
886          for dirname in list(dirs):
887              dir_path = root_path / dirname
888              if dir_path.is_symlink():
889                  continue
890              _secure_path(dir_path, 0o770, owner_uid)
891  
892          for filename in files:
893              file_path = root_path / filename
894              if file_path.is_symlink():
895                  continue
896              _secure_path(file_path, 0o660, owner_uid)
897  
898      logger.debug(f"Secured session files: {session_dir} (owner_uid={owner_uid})")