filesystem_artifact_service.py
1 """ 2 An ADK ArtifactService implementation using the local filesystem for storage. 3 """ 4 5 import asyncio 6 import json 7 import logging 8 import os 9 import shutil 10 import unicodedata 11 12 from google.adk.artifacts import BaseArtifactService 13 from google.adk.artifacts.base_artifact_service import ArtifactVersion 14 from google.genai import types as adk_types 15 from typing_extensions import override 16 17 logger = logging.getLogger(__name__) 18 19 METADATA_FILE_SUFFIX = ".meta" 20 21 22 class FilesystemArtifactService(BaseArtifactService): 23 """ 24 An artifact service implementation using the local filesystem. 25 26 Stores artifacts in a structured directory based on the effective app name 27 (which represents the scope), user ID, session ID (or 'user' namespace), 28 filename, and version. Metadata (like mime_type) is stored in a companion file. 29 """ 30 31 def __init__(self, base_path: str): 32 """ 33 Initializes the FilesystemArtifactService. 34 35 Args: 36 base_path: The root directory where all artifacts will be stored. 37 38 Raises: 39 ValueError: If base_path is not provided or cannot be created. 40 """ 41 if not base_path: 42 raise ValueError("base_path cannot be empty for FilesystemArtifactService") 43 44 self.base_path = os.path.abspath(base_path) 45 46 try: 47 os.makedirs(self.base_path, exist_ok=True) 48 logger.info( 49 "Initialized FilesystemArtifactService. Base path: %s", 50 self.base_path, 51 ) 52 except OSError as e: 53 logger.error( 54 "Failed to create base directory '%s': %s", 55 self.base_path, 56 e, 57 ) 58 raise ValueError( 59 f"Could not create or access base_path '{self.base_path}': {e}" 60 ) from e 61 62 def _file_has_user_namespace(self, filename: str) -> bool: 63 """Checks if the filename has a user namespace.""" 64 return filename.startswith("user:") 65 66 def _get_artifact_dir( 67 self, app_name: str, user_id: str, session_id: str, filename: str 68 ) -> str: 69 """ 70 Constructs the directory path for a specific artifact (all versions). 71 The `app_name` is now the effective scope identifier, resolved by the caller. 72 """ 73 app_name_sanitized = os.path.basename(app_name) 74 user_id_sanitized = os.path.basename(user_id) 75 session_id_sanitized = os.path.basename(session_id) 76 filename_sanitized = os.path.basename(filename) 77 78 if self._file_has_user_namespace(filename): 79 filename_dir = os.path.basename(filename.split(":", 1)[1]) 80 return os.path.join( 81 self.base_path, 82 app_name_sanitized, 83 user_id_sanitized, 84 "user", 85 filename_dir, 86 ) 87 else: 88 return os.path.join( 89 self.base_path, 90 app_name_sanitized, 91 user_id_sanitized, 92 session_id_sanitized, 93 filename_sanitized, 94 ) 95 96 def _get_version_path(self, artifact_dir: str, version: int) -> str: 97 """Constructs the file path for a specific artifact version's data.""" 98 return os.path.join(artifact_dir, str(version)) 99 100 def _get_metadata_path(self, artifact_dir: str, version: int) -> str: 101 """Constructs the file path for a specific artifact version's metadata.""" 102 return os.path.join(artifact_dir, f"{version}{METADATA_FILE_SUFFIX}") 103 104 @override 105 async def save_artifact( 106 self, 107 *, 108 app_name: str, 109 user_id: str, 110 session_id: str, 111 filename: str, 112 artifact: adk_types.Part, 113 ) -> int: 114 log_prefix = "[FSArtifact:Save] " 115 116 filename = self._normalize_filename_unicode(filename) 117 artifact_dir = self._get_artifact_dir(app_name, user_id, session_id, filename) 118 try: 119 await asyncio.to_thread(os.makedirs, artifact_dir, exist_ok=True) 120 except OSError as e: 121 logger.error( 122 "%sFailed to create artifact directory '%s': %s", 123 log_prefix, 124 artifact_dir, 125 e, 126 ) 127 raise OSError(f"Could not create artifact directory: {e}") from e 128 129 versions = await self.list_versions( 130 app_name=app_name, 131 user_id=user_id, 132 session_id=session_id, 133 filename=filename, 134 ) 135 version = 0 if not versions else max(versions) + 1 136 137 version_path = self._get_version_path(artifact_dir, version) 138 metadata_path = self._get_metadata_path(artifact_dir, version) 139 140 try: 141 if not artifact.inline_data or artifact.inline_data.data is None: 142 raise ValueError("Artifact Part has no inline_data to save.") 143 144 metadata = {"mime_type": artifact.inline_data.mime_type} 145 146 def _write_data_file(): 147 """Write artifact data and fsync to disk.""" 148 with open(version_path, "wb") as f: 149 f.write(artifact.inline_data.data) 150 f.flush() 151 os.fsync(f.fileno()) 152 logger.debug("%sWrote data to %s", log_prefix, version_path) 153 154 def _write_metadata_file(): 155 """Write artifact metadata and fsync to disk.""" 156 with open(metadata_path, "w", encoding="utf-8") as f: 157 json.dump(metadata, f) 158 f.flush() 159 os.fsync(f.fileno()) 160 logger.debug("%sWrote metadata to %s", log_prefix, metadata_path) 161 162 # Run file writes concurrently and wait for both to complete 163 await asyncio.gather( 164 asyncio.to_thread(_write_data_file), 165 asyncio.to_thread(_write_metadata_file), 166 ) 167 168 logger.info( 169 "%sSaved artifact '%s' version %d successfully.", 170 log_prefix, 171 filename, 172 version, 173 ) 174 return version 175 except (OSError, ValueError, TypeError) as e: 176 logger.error( 177 "%sFailed to save artifact '%s' version %d: %s", 178 log_prefix, 179 filename, 180 version, 181 e, 182 ) 183 if await asyncio.to_thread(os.path.exists, version_path): 184 await asyncio.to_thread(os.remove, version_path) 185 if await asyncio.to_thread(os.path.exists, metadata_path): 186 await asyncio.to_thread(os.remove, metadata_path) 187 raise OSError(f"Failed to save artifact version {version}: {e}") from e 188 189 @override 190 async def load_artifact( 191 self, 192 *, 193 app_name: str, 194 user_id: str, 195 session_id: str, 196 filename: str, 197 version: int | None = None, 198 ) -> adk_types.Part | None: 199 log_prefix = f"[FSArtifact:Load:{filename}] " 200 filename = self._normalize_filename_unicode(filename) 201 artifact_dir = self._get_artifact_dir(app_name, user_id, session_id, filename) 202 203 if not await asyncio.to_thread(os.path.isdir, artifact_dir): 204 logger.debug("%sArtifact directory not found: %s", log_prefix, artifact_dir) 205 return None 206 207 load_version = version 208 if load_version is None: 209 versions = await self.list_versions( 210 app_name=app_name, 211 user_id=user_id, 212 session_id=session_id, 213 filename=filename, 214 ) 215 if not versions: 216 logger.debug("%sNo versions found for artifact.", log_prefix) 217 return None 218 load_version = max(versions) 219 logger.debug("%sLoading latest version: %d", log_prefix, load_version) 220 else: 221 logger.debug("%sLoading specified version: %d", log_prefix, load_version) 222 223 version_path = self._get_version_path(artifact_dir, load_version) 224 metadata_path = self._get_metadata_path(artifact_dir, load_version) 225 226 if not await asyncio.to_thread( 227 os.path.exists, version_path 228 ) or not await asyncio.to_thread(os.path.exists, metadata_path): 229 logger.warning( 230 "%sData or metadata file missing for version %d.", 231 log_prefix, 232 load_version, 233 ) 234 return None 235 236 try: 237 238 def _read_metadata_file(): 239 with open(metadata_path, encoding="utf-8") as f: 240 return json.load(f) 241 242 metadata = await asyncio.to_thread(_read_metadata_file) 243 mime_type = metadata.get("mime_type", "application/octet-stream") 244 245 def _read_data_file(): 246 with open(version_path, "rb") as f: 247 return f.read() 248 249 data_bytes = await asyncio.to_thread(_read_data_file) 250 251 artifact_part = adk_types.Part.from_bytes( 252 data=data_bytes, mime_type=mime_type 253 ) 254 logger.info( 255 "%sLoaded artifact '%s' version %d successfully (%d bytes, %s).", 256 log_prefix, 257 filename, 258 load_version, 259 len(data_bytes), 260 mime_type, 261 ) 262 return artifact_part 263 264 except (OSError, json.JSONDecodeError) as e: 265 logger.error( 266 "%sFailed to load artifact '%s' version %d: %s", 267 log_prefix, 268 filename, 269 load_version, 270 e, 271 ) 272 return None 273 274 @override 275 async def list_artifact_keys( 276 self, *, app_name: str, user_id: str, session_id: str 277 ) -> list[str]: 278 log_prefix = "[FSArtifact:ListKeys] " 279 filenames = set() 280 app_name_sanitized = os.path.basename(app_name) 281 user_id_sanitized = os.path.basename(user_id) 282 session_id_sanitized = os.path.basename(session_id) 283 284 session_base_dir = os.path.join( 285 self.base_path, app_name_sanitized, user_id_sanitized, session_id_sanitized 286 ) 287 if await asyncio.to_thread(os.path.isdir, session_base_dir): 288 try: 289 for item in await asyncio.to_thread(os.listdir, session_base_dir): 290 item_path = os.path.join(session_base_dir, item) 291 if await asyncio.to_thread(os.path.isdir, item_path): 292 filenames.add(item) 293 except OSError as e: 294 logger.warning( 295 "%sError listing session directory '%s': %s", 296 log_prefix, 297 session_base_dir, 298 e, 299 ) 300 301 user_base_dir = os.path.join( 302 self.base_path, app_name_sanitized, user_id_sanitized, "user" 303 ) 304 if await asyncio.to_thread(os.path.isdir, user_base_dir): 305 try: 306 for item in await asyncio.to_thread(os.listdir, user_base_dir): 307 item_path = os.path.join(user_base_dir, item) 308 if await asyncio.to_thread(os.path.isdir, item_path): 309 filenames.add(f"user:{item}") 310 except OSError as e: 311 logger.warning( 312 "%sError listing user directory '%s': %s", 313 log_prefix, 314 user_base_dir, 315 e, 316 ) 317 318 sorted_filenames = sorted(list(filenames)) 319 logger.debug("%sFound %d artifact keys.", log_prefix, len(sorted_filenames)) 320 return sorted_filenames 321 322 @override 323 async def delete_artifact( 324 self, *, app_name: str, user_id: str, session_id: str, filename: str 325 ) -> None: 326 log_prefix = "[FSArtifact:Delete] " 327 artifact_dir = self._get_artifact_dir(app_name, user_id, session_id, filename) 328 329 if not await asyncio.to_thread(os.path.isdir, artifact_dir): 330 logger.debug("%sArtifact directory not found: %s", log_prefix, artifact_dir) 331 return 332 333 try: 334 await asyncio.to_thread(shutil.rmtree, artifact_dir) 335 logger.info( 336 "%sRemoved artifact directory and all its contents: %s", 337 log_prefix, 338 artifact_dir, 339 ) 340 except OSError as e: 341 logger.error( 342 "%sError deleting artifact directory '%s'", 343 log_prefix, 344 e, 345 ) 346 347 @override 348 async def list_versions( 349 self, *, app_name: str, user_id: str, session_id: str, filename: str 350 ) -> list[int]: 351 log_prefix = f"[FSArtifact:ListVersions:{filename}] " 352 artifact_dir = self._get_artifact_dir(app_name, user_id, session_id, filename) 353 versions = [] 354 355 if not await asyncio.to_thread(os.path.isdir, artifact_dir): 356 logger.debug("%sArtifact directory not found: %s", log_prefix, artifact_dir) 357 return [] 358 359 try: 360 for item in await asyncio.to_thread(os.listdir, artifact_dir): 361 if ( 362 await asyncio.to_thread( 363 os.path.isfile, os.path.join(artifact_dir, item) 364 ) 365 and item.isdigit() 366 ): 367 versions.append(int(item)) 368 except OSError as e: 369 logger.error("%sError listing versions in directory '%s'", log_prefix, e) 370 return [] 371 372 sorted_versions = sorted(versions) 373 logger.debug("%sFound versions: %s", log_prefix, sorted_versions) 374 return sorted_versions 375 376 @override 377 async def list_artifact_versions( 378 self, 379 *, 380 app_name: str, 381 user_id: str, 382 filename: str, 383 session_id: str, 384 ) -> list[ArtifactVersion]: 385 """Lists all versions and their metadata for a specific artifact.""" 386 log_prefix = f"[FSArtifact:ListArtifactVersions:{filename}] " 387 filename = self._normalize_filename_unicode(filename) 388 artifact_dir = self._get_artifact_dir(app_name, user_id, session_id, filename) 389 artifact_versions = [] 390 391 if not await asyncio.to_thread(os.path.isdir, artifact_dir): 392 logger.debug("%sArtifact directory not found: %s", log_prefix, artifact_dir) 393 return [] 394 395 try: 396 for item in await asyncio.to_thread(os.listdir, artifact_dir): 397 item_path = os.path.join(artifact_dir, item) 398 if await asyncio.to_thread(os.path.isfile, item_path) and item.isdigit(): 399 version_num = int(item) 400 version_path = self._get_version_path(artifact_dir, version_num) 401 metadata_path = self._get_metadata_path(artifact_dir, version_num) 402 403 # Read metadata 404 try: 405 406 def _read_metadata(): 407 with open(metadata_path, encoding="utf-8") as f: 408 return json.load(f) 409 410 metadata = await asyncio.to_thread(_read_metadata) 411 mime_type = metadata.get("mime_type", "application/octet-stream") 412 413 # Get file creation time 414 stat_info = await asyncio.to_thread(os.stat, version_path) 415 create_time = stat_info.st_ctime 416 417 # Create ArtifactVersion object 418 artifact_version = ArtifactVersion( 419 version=version_num, 420 canonical_uri=f"file://{version_path}", 421 mime_type=mime_type, 422 create_time=create_time, 423 custom_metadata={}, 424 ) 425 artifact_versions.append(artifact_version) 426 427 except (OSError, json.JSONDecodeError) as e: 428 logger.warning( 429 "%sFailed to read metadata for version %d: %s", 430 log_prefix, 431 version_num, 432 e, 433 ) 434 continue 435 436 except OSError as e: 437 logger.error( 438 "%sError listing versions in directory '%s': %s", 439 log_prefix, 440 artifact_dir, 441 e, 442 ) 443 return [] 444 445 # Sort by version number 446 artifact_versions.sort(key=lambda av: av.version) 447 logger.debug("%sFound %d artifact versions", log_prefix, len(artifact_versions)) 448 return artifact_versions 449 450 @override 451 async def get_artifact_version( 452 self, 453 *, 454 app_name: str, 455 user_id: str, 456 filename: str, 457 session_id: str, 458 version: int | None = None, 459 ) -> ArtifactVersion | None: 460 """Gets the metadata for a specific version of an artifact.""" 461 log_prefix = f"[FSArtifact:GetArtifactVersion:{filename}] " 462 filename = self._normalize_filename_unicode(filename) 463 artifact_dir = self._get_artifact_dir(app_name, user_id, session_id, filename) 464 465 if not await asyncio.to_thread(os.path.isdir, artifact_dir): 466 logger.debug("%sArtifact directory not found: %s", log_prefix, artifact_dir) 467 return None 468 469 # Determine which version to load 470 load_version = version 471 if load_version is None: 472 versions = await self.list_versions( 473 app_name=app_name, 474 user_id=user_id, 475 session_id=session_id, 476 filename=filename, 477 ) 478 if not versions: 479 logger.debug("%sNo versions found for artifact.", log_prefix) 480 return None 481 load_version = max(versions) 482 logger.debug("%sGetting latest version: %d", log_prefix, load_version) 483 else: 484 logger.debug("%sGetting specified version: %d", log_prefix, load_version) 485 486 version_path = self._get_version_path(artifact_dir, load_version) 487 metadata_path = self._get_metadata_path(artifact_dir, load_version) 488 489 if not await asyncio.to_thread( 490 os.path.exists, version_path 491 ) or not await asyncio.to_thread(os.path.exists, metadata_path): 492 logger.warning( 493 "%sData or metadata file missing for version %d.", 494 log_prefix, 495 load_version, 496 ) 497 return None 498 499 try: 500 # Read metadata 501 def _read_metadata(): 502 with open(metadata_path, encoding="utf-8") as f: 503 return json.load(f) 504 505 metadata = await asyncio.to_thread(_read_metadata) 506 mime_type = metadata.get("mime_type", "application/octet-stream") 507 508 # Get file creation time 509 stat_info = await asyncio.to_thread(os.stat, version_path) 510 create_time = stat_info.st_ctime 511 512 # Create and return ArtifactVersion object 513 artifact_version = ArtifactVersion( 514 version=load_version, 515 canonical_uri=f"file://{version_path}", 516 mime_type=mime_type, 517 create_time=create_time, 518 custom_metadata={}, 519 ) 520 521 logger.info( 522 "%sRetrieved metadata for artifact '%s' version %d", 523 log_prefix, 524 filename, 525 load_version, 526 ) 527 return artifact_version 528 529 except (OSError, json.JSONDecodeError) as e: 530 logger.error( 531 "%sFailed to get metadata for artifact '%s' version %d: %s", 532 log_prefix, 533 filename, 534 load_version, 535 e, 536 ) 537 return None 538 539 def _normalize_filename_unicode(self, filename: str) -> str: 540 """ 541 Normalizes Unicode characters in a filename to their standard form. 542 Specifically targets compatibility characters like non-breaking spaces (\u202f) 543 and converts them to their regular ASCII equivalents (a standard space). 544 """ 545 return unicodedata.normalize("NFKC", filename)