/ src / solace_agent_mesh / agent / adk / artifacts / filesystem_artifact_service.py
filesystem_artifact_service.py
  1  """
  2  An ADK ArtifactService implementation using the local filesystem for storage.
  3  """
  4  
  5  import asyncio
  6  import json
  7  import logging
  8  import os
  9  import shutil
 10  import unicodedata
 11  
 12  from google.adk.artifacts import BaseArtifactService
 13  from google.adk.artifacts.base_artifact_service import ArtifactVersion
 14  from google.genai import types as adk_types
 15  from typing_extensions import override
 16  
 17  logger = logging.getLogger(__name__)
 18  
 19  METADATA_FILE_SUFFIX = ".meta"
 20  
 21  
 22  class FilesystemArtifactService(BaseArtifactService):
 23      """
 24      An artifact service implementation using the local filesystem.
 25  
 26      Stores artifacts in a structured directory based on the effective app name
 27      (which represents the scope), user ID, session ID (or 'user' namespace),
 28      filename, and version. Metadata (like mime_type) is stored in a companion file.
 29      """
 30  
 31      def __init__(self, base_path: str):
 32          """
 33          Initializes the FilesystemArtifactService.
 34  
 35          Args:
 36              base_path: The root directory where all artifacts will be stored.
 37  
 38          Raises:
 39              ValueError: If base_path is not provided or cannot be created.
 40          """
 41          if not base_path:
 42              raise ValueError("base_path cannot be empty for FilesystemArtifactService")
 43  
 44          self.base_path = os.path.abspath(base_path)
 45  
 46          try:
 47              os.makedirs(self.base_path, exist_ok=True)
 48              logger.info(
 49                  "Initialized FilesystemArtifactService. Base path: %s",
 50                  self.base_path,
 51              )
 52          except OSError as e:
 53              logger.error(
 54                  "Failed to create base directory '%s': %s",
 55                  self.base_path,
 56                  e,
 57              )
 58              raise ValueError(
 59                  f"Could not create or access base_path '{self.base_path}': {e}"
 60              ) from e
 61  
 62      def _file_has_user_namespace(self, filename: str) -> bool:
 63          """Checks if the filename has a user namespace."""
 64          return filename.startswith("user:")
 65  
 66      def _get_artifact_dir(
 67          self, app_name: str, user_id: str, session_id: str, filename: str
 68      ) -> str:
 69          """
 70          Constructs the directory path for a specific artifact (all versions).
 71          The `app_name` is now the effective scope identifier, resolved by the caller.
 72          """
 73          app_name_sanitized = os.path.basename(app_name)
 74          user_id_sanitized = os.path.basename(user_id)
 75          session_id_sanitized = os.path.basename(session_id)
 76          filename_sanitized = os.path.basename(filename)
 77  
 78          if self._file_has_user_namespace(filename):
 79              filename_dir = os.path.basename(filename.split(":", 1)[1])
 80              return os.path.join(
 81                  self.base_path,
 82                  app_name_sanitized,
 83                  user_id_sanitized,
 84                  "user",
 85                  filename_dir,
 86              )
 87          else:
 88              return os.path.join(
 89                  self.base_path,
 90                  app_name_sanitized,
 91                  user_id_sanitized,
 92                  session_id_sanitized,
 93                  filename_sanitized,
 94              )
 95  
 96      def _get_version_path(self, artifact_dir: str, version: int) -> str:
 97          """Constructs the file path for a specific artifact version's data."""
 98          return os.path.join(artifact_dir, str(version))
 99  
100      def _get_metadata_path(self, artifact_dir: str, version: int) -> str:
101          """Constructs the file path for a specific artifact version's metadata."""
102          return os.path.join(artifact_dir, f"{version}{METADATA_FILE_SUFFIX}")
103  
104      @override
105      async def save_artifact(
106          self,
107          *,
108          app_name: str,
109          user_id: str,
110          session_id: str,
111          filename: str,
112          artifact: adk_types.Part,
113      ) -> int:
114          log_prefix = "[FSArtifact:Save] "
115  
116          filename = self._normalize_filename_unicode(filename)
117          artifact_dir = self._get_artifact_dir(app_name, user_id, session_id, filename)
118          try:
119              await asyncio.to_thread(os.makedirs, artifact_dir, exist_ok=True)
120          except OSError as e:
121              logger.error(
122                  "%sFailed to create artifact directory '%s': %s",
123                  log_prefix,
124                  artifact_dir,
125                  e,
126              )
127              raise OSError(f"Could not create artifact directory: {e}") from e
128  
129          versions = await self.list_versions(
130              app_name=app_name,
131              user_id=user_id,
132              session_id=session_id,
133              filename=filename,
134          )
135          version = 0 if not versions else max(versions) + 1
136  
137          version_path = self._get_version_path(artifact_dir, version)
138          metadata_path = self._get_metadata_path(artifact_dir, version)
139  
140          try:
141              if not artifact.inline_data or artifact.inline_data.data is None:
142                  raise ValueError("Artifact Part has no inline_data to save.")
143  
144              metadata = {"mime_type": artifact.inline_data.mime_type}
145  
146              def _write_data_file():
147                  """Write artifact data and fsync to disk."""
148                  with open(version_path, "wb") as f:
149                      f.write(artifact.inline_data.data)
150                      f.flush()
151                      os.fsync(f.fileno())
152                  logger.debug("%sWrote data to %s", log_prefix, version_path)
153  
154              def _write_metadata_file():
155                  """Write artifact metadata and fsync to disk."""
156                  with open(metadata_path, "w", encoding="utf-8") as f:
157                      json.dump(metadata, f)
158                      f.flush()
159                      os.fsync(f.fileno())
160                  logger.debug("%sWrote metadata to %s", log_prefix, metadata_path)
161  
162              # Run file writes concurrently and wait for both to complete
163              await asyncio.gather(
164                  asyncio.to_thread(_write_data_file),
165                  asyncio.to_thread(_write_metadata_file),
166              )
167  
168              logger.info(
169                  "%sSaved artifact '%s' version %d successfully.",
170                  log_prefix,
171                  filename,
172                  version,
173              )
174              return version
175          except (OSError, ValueError, TypeError) as e:
176              logger.error(
177                  "%sFailed to save artifact '%s' version %d: %s",
178                  log_prefix,
179                  filename,
180                  version,
181                  e,
182              )
183              if await asyncio.to_thread(os.path.exists, version_path):
184                  await asyncio.to_thread(os.remove, version_path)
185              if await asyncio.to_thread(os.path.exists, metadata_path):
186                  await asyncio.to_thread(os.remove, metadata_path)
187              raise OSError(f"Failed to save artifact version {version}: {e}") from e
188  
189      @override
190      async def load_artifact(
191          self,
192          *,
193          app_name: str,
194          user_id: str,
195          session_id: str,
196          filename: str,
197          version: int | None = None,
198      ) -> adk_types.Part | None:
199          log_prefix = f"[FSArtifact:Load:{filename}] "
200          filename = self._normalize_filename_unicode(filename)
201          artifact_dir = self._get_artifact_dir(app_name, user_id, session_id, filename)
202  
203          if not await asyncio.to_thread(os.path.isdir, artifact_dir):
204              logger.debug("%sArtifact directory not found: %s", log_prefix, artifact_dir)
205              return None
206  
207          load_version = version
208          if load_version is None:
209              versions = await self.list_versions(
210                  app_name=app_name,
211                  user_id=user_id,
212                  session_id=session_id,
213                  filename=filename,
214              )
215              if not versions:
216                  logger.debug("%sNo versions found for artifact.", log_prefix)
217                  return None
218              load_version = max(versions)
219              logger.debug("%sLoading latest version: %d", log_prefix, load_version)
220          else:
221              logger.debug("%sLoading specified version: %d", log_prefix, load_version)
222  
223          version_path = self._get_version_path(artifact_dir, load_version)
224          metadata_path = self._get_metadata_path(artifact_dir, load_version)
225  
226          if not await asyncio.to_thread(
227              os.path.exists, version_path
228          ) or not await asyncio.to_thread(os.path.exists, metadata_path):
229              logger.warning(
230                  "%sData or metadata file missing for version %d.",
231                  log_prefix,
232                  load_version,
233              )
234              return None
235  
236          try:
237  
238              def _read_metadata_file():
239                  with open(metadata_path, encoding="utf-8") as f:
240                      return json.load(f)
241  
242              metadata = await asyncio.to_thread(_read_metadata_file)
243              mime_type = metadata.get("mime_type", "application/octet-stream")
244  
245              def _read_data_file():
246                  with open(version_path, "rb") as f:
247                      return f.read()
248  
249              data_bytes = await asyncio.to_thread(_read_data_file)
250  
251              artifact_part = adk_types.Part.from_bytes(
252                  data=data_bytes, mime_type=mime_type
253              )
254              logger.info(
255                  "%sLoaded artifact '%s' version %d successfully (%d bytes, %s).",
256                  log_prefix,
257                  filename,
258                  load_version,
259                  len(data_bytes),
260                  mime_type,
261              )
262              return artifact_part
263  
264          except (OSError, json.JSONDecodeError) as e:
265              logger.error(
266                  "%sFailed to load artifact '%s' version %d: %s",
267                  log_prefix,
268                  filename,
269                  load_version,
270                  e,
271              )
272              return None
273  
274      @override
275      async def list_artifact_keys(
276          self, *, app_name: str, user_id: str, session_id: str
277      ) -> list[str]:
278          log_prefix = "[FSArtifact:ListKeys] "
279          filenames = set()
280          app_name_sanitized = os.path.basename(app_name)
281          user_id_sanitized = os.path.basename(user_id)
282          session_id_sanitized = os.path.basename(session_id)
283  
284          session_base_dir = os.path.join(
285              self.base_path, app_name_sanitized, user_id_sanitized, session_id_sanitized
286          )
287          if await asyncio.to_thread(os.path.isdir, session_base_dir):
288              try:
289                  for item in await asyncio.to_thread(os.listdir, session_base_dir):
290                      item_path = os.path.join(session_base_dir, item)
291                      if await asyncio.to_thread(os.path.isdir, item_path):
292                          filenames.add(item)
293              except OSError as e:
294                  logger.warning(
295                      "%sError listing session directory '%s': %s",
296                      log_prefix,
297                      session_base_dir,
298                      e,
299                  )
300  
301          user_base_dir = os.path.join(
302              self.base_path, app_name_sanitized, user_id_sanitized, "user"
303          )
304          if await asyncio.to_thread(os.path.isdir, user_base_dir):
305              try:
306                  for item in await asyncio.to_thread(os.listdir, user_base_dir):
307                      item_path = os.path.join(user_base_dir, item)
308                      if await asyncio.to_thread(os.path.isdir, item_path):
309                          filenames.add(f"user:{item}")
310              except OSError as e:
311                  logger.warning(
312                      "%sError listing user directory '%s': %s",
313                      log_prefix,
314                      user_base_dir,
315                      e,
316                  )
317  
318          sorted_filenames = sorted(list(filenames))
319          logger.debug("%sFound %d artifact keys.", log_prefix, len(sorted_filenames))
320          return sorted_filenames
321  
322      @override
323      async def delete_artifact(
324          self, *, app_name: str, user_id: str, session_id: str, filename: str
325      ) -> None:
326          log_prefix = "[FSArtifact:Delete] "
327          artifact_dir = self._get_artifact_dir(app_name, user_id, session_id, filename)
328  
329          if not await asyncio.to_thread(os.path.isdir, artifact_dir):
330              logger.debug("%sArtifact directory not found: %s", log_prefix, artifact_dir)
331              return
332  
333          try:
334              await asyncio.to_thread(shutil.rmtree, artifact_dir)
335              logger.info(
336                  "%sRemoved artifact directory and all its contents: %s",
337                  log_prefix,
338                  artifact_dir,
339              )
340          except OSError as e:
341              logger.error(
342                  "%sError deleting artifact directory '%s'",
343                  log_prefix,
344                  e,
345              )
346  
347      @override
348      async def list_versions(
349          self, *, app_name: str, user_id: str, session_id: str, filename: str
350      ) -> list[int]:
351          log_prefix = f"[FSArtifact:ListVersions:{filename}] "
352          artifact_dir = self._get_artifact_dir(app_name, user_id, session_id, filename)
353          versions = []
354  
355          if not await asyncio.to_thread(os.path.isdir, artifact_dir):
356              logger.debug("%sArtifact directory not found: %s", log_prefix, artifact_dir)
357              return []
358  
359          try:
360              for item in await asyncio.to_thread(os.listdir, artifact_dir):
361                  if (
362                      await asyncio.to_thread(
363                          os.path.isfile, os.path.join(artifact_dir, item)
364                      )
365                      and item.isdigit()
366                  ):
367                      versions.append(int(item))
368          except OSError as e:
369              logger.error("%sError listing versions in directory '%s'", log_prefix, e)
370              return []
371  
372          sorted_versions = sorted(versions)
373          logger.debug("%sFound versions: %s", log_prefix, sorted_versions)
374          return sorted_versions
375  
376      @override
377      async def list_artifact_versions(
378          self,
379          *,
380          app_name: str,
381          user_id: str,
382          filename: str,
383          session_id: str,
384      ) -> list[ArtifactVersion]:
385          """Lists all versions and their metadata for a specific artifact."""
386          log_prefix = f"[FSArtifact:ListArtifactVersions:{filename}] "
387          filename = self._normalize_filename_unicode(filename)
388          artifact_dir = self._get_artifact_dir(app_name, user_id, session_id, filename)
389          artifact_versions = []
390  
391          if not await asyncio.to_thread(os.path.isdir, artifact_dir):
392              logger.debug("%sArtifact directory not found: %s", log_prefix, artifact_dir)
393              return []
394  
395          try:
396              for item in await asyncio.to_thread(os.listdir, artifact_dir):
397                  item_path = os.path.join(artifact_dir, item)
398                  if await asyncio.to_thread(os.path.isfile, item_path) and item.isdigit():
399                      version_num = int(item)
400                      version_path = self._get_version_path(artifact_dir, version_num)
401                      metadata_path = self._get_metadata_path(artifact_dir, version_num)
402  
403                      # Read metadata
404                      try:
405  
406                          def _read_metadata():
407                              with open(metadata_path, encoding="utf-8") as f:
408                                  return json.load(f)
409  
410                          metadata = await asyncio.to_thread(_read_metadata)
411                          mime_type = metadata.get("mime_type", "application/octet-stream")
412  
413                          # Get file creation time
414                          stat_info = await asyncio.to_thread(os.stat, version_path)
415                          create_time = stat_info.st_ctime
416  
417                          # Create ArtifactVersion object
418                          artifact_version = ArtifactVersion(
419                              version=version_num,
420                              canonical_uri=f"file://{version_path}",
421                              mime_type=mime_type,
422                              create_time=create_time,
423                              custom_metadata={},
424                          )
425                          artifact_versions.append(artifact_version)
426  
427                      except (OSError, json.JSONDecodeError) as e:
428                          logger.warning(
429                              "%sFailed to read metadata for version %d: %s",
430                              log_prefix,
431                              version_num,
432                              e,
433                          )
434                          continue
435  
436          except OSError as e:
437              logger.error(
438                  "%sError listing versions in directory '%s': %s",
439                  log_prefix,
440                  artifact_dir,
441                  e,
442              )
443              return []
444  
445          # Sort by version number
446          artifact_versions.sort(key=lambda av: av.version)
447          logger.debug("%sFound %d artifact versions", log_prefix, len(artifact_versions))
448          return artifact_versions
449  
450      @override
451      async def get_artifact_version(
452          self,
453          *,
454          app_name: str,
455          user_id: str,
456          filename: str,
457          session_id: str,
458          version: int | None = None,
459      ) -> ArtifactVersion | None:
460          """Gets the metadata for a specific version of an artifact."""
461          log_prefix = f"[FSArtifact:GetArtifactVersion:{filename}] "
462          filename = self._normalize_filename_unicode(filename)
463          artifact_dir = self._get_artifact_dir(app_name, user_id, session_id, filename)
464  
465          if not await asyncio.to_thread(os.path.isdir, artifact_dir):
466              logger.debug("%sArtifact directory not found: %s", log_prefix, artifact_dir)
467              return None
468  
469          # Determine which version to load
470          load_version = version
471          if load_version is None:
472              versions = await self.list_versions(
473                  app_name=app_name,
474                  user_id=user_id,
475                  session_id=session_id,
476                  filename=filename,
477              )
478              if not versions:
479                  logger.debug("%sNo versions found for artifact.", log_prefix)
480                  return None
481              load_version = max(versions)
482              logger.debug("%sGetting latest version: %d", log_prefix, load_version)
483          else:
484              logger.debug("%sGetting specified version: %d", log_prefix, load_version)
485  
486          version_path = self._get_version_path(artifact_dir, load_version)
487          metadata_path = self._get_metadata_path(artifact_dir, load_version)
488  
489          if not await asyncio.to_thread(
490              os.path.exists, version_path
491          ) or not await asyncio.to_thread(os.path.exists, metadata_path):
492              logger.warning(
493                  "%sData or metadata file missing for version %d.",
494                  log_prefix,
495                  load_version,
496              )
497              return None
498  
499          try:
500              # Read metadata
501              def _read_metadata():
502                  with open(metadata_path, encoding="utf-8") as f:
503                      return json.load(f)
504  
505              metadata = await asyncio.to_thread(_read_metadata)
506              mime_type = metadata.get("mime_type", "application/octet-stream")
507  
508              # Get file creation time
509              stat_info = await asyncio.to_thread(os.stat, version_path)
510              create_time = stat_info.st_ctime
511  
512              # Create and return ArtifactVersion object
513              artifact_version = ArtifactVersion(
514                  version=load_version,
515                  canonical_uri=f"file://{version_path}",
516                  mime_type=mime_type,
517                  create_time=create_time,
518                  custom_metadata={},
519              )
520  
521              logger.info(
522                  "%sRetrieved metadata for artifact '%s' version %d",
523                  log_prefix,
524                  filename,
525                  load_version,
526              )
527              return artifact_version
528  
529          except (OSError, json.JSONDecodeError) as e:
530              logger.error(
531                  "%sFailed to get metadata for artifact '%s' version %d: %s",
532                  log_prefix,
533                  filename,
534                  load_version,
535                  e,
536              )
537              return None
538  
539      def _normalize_filename_unicode(self, filename: str) -> str:
540          """
541          Normalizes Unicode characters in a filename to their standard form.
542          Specifically targets compatibility characters like non-breaking spaces (\u202f)
543          and converts them to their regular ASCII equivalents (a standard space).
544          """
545          return unicodedata.normalize("NFKC", filename)