/ src / solace_agent_mesh / common / utils / embeds / modifiers.py
modifiers.py
  1  """
  2  Defines modifier implementation functions and their contracts.
  3  """
  4  
  5  import logging
  6  import re
  7  from typing import Any, Callable, Dict, Optional, Tuple, List
  8  
  9  from .constants import EARLY_EMBED_TYPES, LATE_EMBED_TYPES
 10  
 11  log = logging.getLogger(__name__)
 12  
 13  from jsonpath_ng.ext import parse as jsonpath_parse
 14  import pystache
 15  
 16  from google.adk.artifacts import BaseArtifactService
 17  
 18  from .types import DataFormat, ResolutionMode
 19  
 20  
 21  def _apply_jsonpath(
 22      current_data: Any, expression: str, mime_type: Optional[str], log_id: str
 23  ) -> Tuple[Any, Optional[str], Optional[str]]:
 24      """
 25      Applies a JSONPath expression to parsed JSON data.
 26  
 27      Args:
 28          current_data: The input data (expected to be dict or list).
 29          expression: The JSONPath expression string.
 30          mime_type: The original mime type (passed through).
 31          log_id: Identifier for logging.
 32  
 33      Returns:
 34          Tuple: (result_data, original_mime_type, error_string)
 35                 result_data is typically a list of matched values.
 36      """
 37      if not isinstance(current_data, (dict, list)):
 38          return (
 39              current_data,
 40              mime_type,
 41              f"Input data for 'jsonpath' must be a JSON object or list, got {type(current_data).__name__}.",
 42          )
 43  
 44      try:
 45          jsonpath_expr = jsonpath_parse(expression)
 46          matches = [match.value for match in jsonpath_expr.find(current_data)]
 47          return matches, mime_type, None
 48      except Exception as e:
 49          return (
 50              current_data,
 51              mime_type,
 52              f"Error applying JSONPath expression '{expression}': {e}",
 53          )
 54  
 55  
 56  def _apply_select_cols(
 57      current_data: List[Dict], cols_str: str, mime_type: Optional[str], log_id: str
 58  ) -> Tuple[Any, Optional[str], Optional[str]]:
 59      """
 60      Selects specific columns from data represented as a list of dictionaries.
 61  
 62      Args:
 63          current_data: The input data (expected List[Dict]).
 64          cols_str: Comma-separated string of column names to keep.
 65          mime_type: The original mime type (passed through).
 66          log_id: Identifier for logging.
 67  
 68      Returns:
 69          Tuple: (result_data, original_mime_type, error_string)
 70                 result_data is List[Dict] containing only selected columns.
 71      """
 72      if not isinstance(current_data, list) or (
 73          current_data and not isinstance(current_data[0], dict)
 74      ):
 75          return (
 76              current_data,
 77              mime_type,
 78              f"Input data for 'select_cols' must be a list of dictionaries, got {type(current_data).__name__}.",
 79          )
 80  
 81      if not current_data:
 82          return [], mime_type, None
 83  
 84      try:
 85          header = list(current_data[0].keys())
 86          target_cols = [col.strip() for col in cols_str.split(",")]
 87          output_list = []
 88  
 89          for target_col in target_cols:
 90              if target_col not in header:
 91                  return (
 92                      current_data,
 93                      mime_type,
 94                      f"Column '{target_col}' not found in data keys: {header}",
 95                  )
 96  
 97          for row_dict in current_data:
 98              new_row = {col: row_dict.get(col) for col in target_cols}
 99              output_list.append(new_row)
100  
101          return output_list, mime_type, None
102  
103      except Exception as e:
104          return current_data, mime_type, f"Error selecting columns '{cols_str}': {e}"
105  
106  
107  def _apply_filter_rows_eq(
108      current_data: List[Dict], filter_spec: str, mime_type: Optional[str], log_id: str
109  ) -> Tuple[Any, Optional[str], Optional[str]]:
110      """
111      Filters a list of dictionaries based on a column's value equality.
112  
113      Args:
114          current_data: The input data (expected List[Dict]).
115          filter_spec: String in the format 'column_name:value'.
116          mime_type: The original mime type (passed through).
117          log_id: Identifier for logging.
118  
119      Returns:
120          Tuple: (result_data, original_mime_type, error_string)
121                 result_data is List[Dict] containing only filtered rows.
122      """
123      if not isinstance(current_data, list) or (
124          current_data and not isinstance(current_data[0], dict)
125      ):
126          return (
127              current_data,
128              mime_type,
129              f"Input data for 'filter_rows_eq' must be a list of dictionaries, got {type(current_data).__name__}.",
130          )
131  
132      if not current_data:
133          return [], mime_type, None
134  
135      try:
136          parts = filter_spec.split(":", 1)
137          if len(parts) != 2:
138              return (
139                  current_data,
140                  mime_type,
141                  f"Invalid filter format '{filter_spec}'. Expected 'column_name:value'.",
142              )
143          col_name, filter_value = parts[0].strip(), parts[1].strip()
144  
145          header = list(current_data[0].keys())
146          if col_name not in header:
147              return (
148                  current_data,
149                  mime_type,
150                  f"Filter column '{col_name}' not found in data keys: {header}",
151              )
152  
153          output_list = [
154              row for row in current_data if str(row.get(col_name)) == filter_value
155          ]
156  
157          return output_list, mime_type, None
158  
159      except Exception as e:
160          return current_data, mime_type, f"Error filtering rows by '{filter_spec}': {e}"
161  
162  
163  def _apply_slice_rows(
164      current_data: List[Dict], slice_spec: str, mime_type: Optional[str], log_id: str
165  ) -> Tuple[Any, Optional[str], Optional[str]]:
166      """
167      Selects a slice of rows from a list of dictionaries.
168  
169      Args:
170          current_data: The input data (expected List[Dict]).
171          slice_spec: String in Python slice format 'start:end'.
172          mime_type: The original mime type (passed through).
173          log_id: Identifier for logging.
174  
175      Returns:
176          Tuple: (result_data, original_mime_type, error_string)
177                 result_data is List[Dict] containing the sliced rows.
178      """
179      if not isinstance(current_data, list):
180          return (
181              current_data,
182              mime_type,
183              f"Input data for 'slice_rows' must be a list, got {type(current_data).__name__}.",
184          )
185  
186      try:
187          start_str, end_str = None, None
188          if ":" in slice_spec:
189              parts = slice_spec.split(":", 1)
190              start_str, end_str = parts[0].strip(), parts[1].strip()
191          else:
192              return (
193                  current_data,
194                  mime_type,
195                  f"Invalid slice format '{slice_spec}'. Expected 'start:end'.",
196              )
197  
198          start = int(start_str) if start_str else 0
199          end = int(end_str) if end_str else None
200  
201          sliced_data = current_data[start:end]
202  
203          return sliced_data, mime_type, None
204  
205      except (ValueError, TypeError) as e:
206          return current_data, mime_type, f"Invalid slice indices in '{slice_spec}': {e}"
207      except Exception as e:
208          return current_data, mime_type, f"Error slicing rows '{slice_spec}': {e}"
209  
210  
211  def _apply_slice_lines(
212      current_data: str, slice_spec: str, mime_type: Optional[str], log_id: str
213  ) -> Tuple[Any, Optional[str], Optional[str]]:
214      """
215      Selects a slice of lines from text data.
216  
217      Args:
218          current_data: The input data (expected str).
219          slice_spec: String in Python slice format 'start:end'.
220          mime_type: The original mime type (passed through).
221          log_id: Identifier for logging.
222  
223      Returns:
224          Tuple: (result_data, original_mime_type, error_string)
225                 result_data is str containing the sliced lines.
226      """
227      if not isinstance(current_data, str):
228          return (
229              current_data,
230              mime_type,
231              f"Input data for 'slice_lines' must be a string, got {type(current_data).__name__}.",
232          )
233  
234      try:
235          start_str, end_str = None, None
236          if ":" in slice_spec:
237              parts = slice_spec.split(":", 1)
238              start_str, end_str = parts[0].strip(), parts[1].strip()
239          else:
240              return (
241                  current_data,
242                  mime_type,
243                  f"Invalid slice format '{slice_spec}'. Expected 'start:end'.",
244              )
245  
246          start = int(start_str) if start_str else 0
247          end = int(end_str) if end_str else None
248  
249          lines = current_data.splitlines(keepends=True)
250          sliced_lines = lines[start:end]
251  
252          return "".join(sliced_lines), mime_type, None
253  
254      except (ValueError, TypeError) as e:
255          return current_data, mime_type, f"Invalid slice indices in '{slice_spec}': {e}"
256      except Exception as e:
257          return current_data, mime_type, f"Error slicing text lines '{slice_spec}': {e}"
258  
259  
260  def _apply_grep(
261      current_data: str, pattern: str, mime_type: Optional[str], log_id: str
262  ) -> Tuple[Any, Optional[str], Optional[str]]:
263      """
264      Filters lines matching a regex pattern from text data.
265  
266      Args:
267          current_data: The input data (expected str).
268          pattern: The regex pattern string.
269          mime_type: The original mime type (passed through).
270          log_id: Identifier for logging.
271  
272      Returns:
273          Tuple: (result_data, original_mime_type, error_string)
274                 result_data is str containing only matching lines.
275      """
276      if not isinstance(current_data, str):
277          return (
278              current_data,
279              mime_type,
280              f"Input data for 'grep' must be a string, got {type(current_data).__name__}.",
281          )
282  
283      try:
284          regex = re.compile(pattern)
285          lines = current_data.splitlines(keepends=True)
286          filtered_lines = [line for line in lines if regex.search(line)]
287          return "".join(filtered_lines), mime_type, None
288      except re.error as e:
289          return current_data, mime_type, f"Invalid regex pattern '{pattern}': {e}"
290      except Exception as e:
291          return current_data, mime_type, f"Error applying grep pattern '{pattern}': {e}"
292  
293  
294  def _apply_head(
295      current_data: str, n_str: str, mime_type: Optional[str], log_id: str
296  ) -> Tuple[Any, Optional[str], Optional[str]]:
297      """
298      Returns the first N lines of text data.
299  
300      Args:
301          current_data: The input data (expected str).
302          n_str: String representing the number of lines (N).
303          mime_type: The original mime type (passed through).
304          log_id: Identifier for logging.
305  
306      Returns:
307          Tuple: (result_data, original_mime_type, error_string)
308                 result_data is str containing the first N lines.
309      """
310      if not isinstance(current_data, str):
311          return (
312              current_data,
313              mime_type,
314              f"Input data for 'head' must be a string, got {type(current_data).__name__}.",
315          )
316  
317      try:
318          n = int(n_str.strip())
319          if n < 0:
320              return current_data, mime_type, "Head count N cannot be negative."
321  
322          lines = current_data.splitlines(keepends=True)
323          head_lines = lines[:n]
324          return "".join(head_lines), mime_type, None
325      except (ValueError, TypeError) as e:
326          return current_data, mime_type, f"Invalid head count N '{n_str}': {e}"
327      except Exception as e:
328          return current_data, mime_type, f"Error applying head '{n_str}': {e}"
329  
330  
331  def _apply_tail(
332      current_data: str, n_str: str, mime_type: Optional[str], log_id: str
333  ) -> Tuple[Any, Optional[str], Optional[str]]:
334      """
335      Returns the last N lines of text data.
336  
337      Args:
338          current_data: The input data (expected str).
339          n_str: String representing the number of lines (N).
340          mime_type: The original mime type (passed through).
341          log_id: Identifier for logging.
342  
343      Returns:
344          Tuple: (result_data, original_mime_type, error_string)
345                 result_data is str containing the last N lines.
346      """
347      if not isinstance(current_data, str):
348          return (
349              current_data,
350              mime_type,
351              f"Input data for 'tail' must be a string, got {type(current_data).__name__}.",
352          )
353  
354      try:
355          n = int(n_str.strip())
356          if n < 0:
357              return current_data, mime_type, "Tail count N cannot be negative."
358          if n == 0:
359              return "", mime_type, None
360  
361          lines = current_data.splitlines(keepends=True)
362          tail_lines = lines[-n:]
363          return "".join(tail_lines), mime_type, None
364      except (ValueError, TypeError) as e:
365          return current_data, mime_type, f"Invalid tail count N '{n_str}': {e}"
366      except Exception as e:
367          return current_data, mime_type, f"Error applying tail '{n_str}': {e}"
368  
369  
370  def _apply_select_fields(
371      current_data: List[Dict], fields_str: str, mime_type: Optional[str], log_id: str
372  ) -> Tuple[Any, Optional[str], Optional[str]]:
373      """
374      Selects specific fields from a list of dictionaries.
375  
376      Args:
377          current_data: The input data (expected List[Dict]).
378          fields_str: Comma-separated string of field names to keep.
379          mime_type: The original mime type (passed through).
380          log_id: Identifier for logging.
381  
382      Returns:
383          Tuple: (result_data, original_mime_type, error_string)
384                 result_data is List[Dict] containing only selected fields.
385      """
386      if not isinstance(current_data, list) or (
387          current_data and not isinstance(current_data[0], dict)
388      ):
389          return (
390              current_data,
391              mime_type,
392              f"Input data for 'select_fields' must be a list of dictionaries, got {type(current_data).__name__}.",
393          )
394  
395      target_fields = [field.strip() for field in fields_str.split(",")]
396      if not target_fields:
397          return current_data, mime_type, "No fields specified for 'select_fields'."
398  
399      output_list = []
400      try:
401          for item in current_data:
402              if isinstance(item, dict):
403                  new_item = {
404                      field: item.get(field) for field in target_fields if field in item
405                  }
406                  output_list.append(new_item)
407              else:
408                  log.warning(
409                      "%s Skipping non-dictionary item in list during select_fields.",
410                      log_id,
411                  )
412                  continue
413          return output_list, mime_type, None
414      except Exception as e:
415          return current_data, mime_type, f"Error selecting fields '{fields_str}': {e}"
416  
417  
418  async def _apply_template(
419      current_data: Any,
420      template_spec: str,
421      mime_type: Optional[str],
422      log_id: str,
423      context: Any,
424  ) -> Tuple[Any, Optional[str], Optional[str]]:
425      """
426      Applies a Mustache template loaded from an artifact to the input data.
427      This version first renders the template, then resolves embeds on the result.
428  
429      Args:
430          current_data: The input data (expected dict, list, or str).
431          template_spec: String 'template_filename[:version]'.
432          mime_type: The original mime type (passed through).
433          log_id: Identifier for logging.
434          context: The Gateway context dictionary containing artifact_service and session_context.
435  
436      Returns:
437          Tuple: (result_data, original_mime_type, error_string)
438                 result_data is the rendered and resolved string.
439      """
440      from .resolver import resolve_embeds_recursively_in_string, evaluate_embed
441  
442      if not isinstance(current_data, (dict, list, str)):
443          return (
444              current_data,
445              mime_type,
446              f"Input data for 'apply_to_template' must be dict, list, or string, got {type(current_data).__name__}.",
447          )
448  
449      parts = template_spec.strip().split(":", 1)
450      template_filename = parts[0]
451      template_version_str = parts[1] if len(parts) > 1 else None
452      template_version = None
453  
454      if not template_filename:
455          return current_data, mime_type, "Template filename cannot be empty."
456  
457      if not isinstance(context, dict):
458          return current_data, mime_type, "Invalid context for template loading."
459      artifact_service: Optional[BaseArtifactService] = context.get("artifact_service")
460      session_context = context.get("session_context")
461      if not artifact_service or not session_context:
462          return (
463              current_data,
464              mime_type,
465              "ArtifactService or session context not available for template loading.",
466          )
467  
468      app_name = session_context.get("app_name")
469      user_id = session_context.get("user_id")
470      session_id = session_context.get("session_id")
471      if not all([app_name, user_id, session_id]):
472          return (
473              current_data,
474              mime_type,
475              "Missing required session identifiers in context for template loading.",
476          )
477  
478      try:
479          if template_version_str:
480              template_version = int(template_version_str)
481          else:
482              versions = await artifact_service.list_versions(
483                  app_name=app_name,
484                  user_id=user_id,
485                  session_id=session_id,
486                  filename=template_filename,
487              )
488              if not versions:
489                  return (
490                      current_data,
491                      mime_type,
492                      f"Template artifact '{template_filename}' (latest) not found.",
493                  )
494              template_version = max(versions)
495  
496          template_part = await artifact_service.load_artifact(
497              app_name=app_name,
498              user_id=user_id,
499              session_id=session_id,
500              filename=template_filename,
501              version=template_version,
502          )
503  
504          if not template_part or not template_part.inline_data:
505              return (
506                  current_data,
507                  mime_type,
508                  f"Template artifact '{template_filename}' v{template_version} not found or empty.",
509              )
510  
511          template_bytes = template_part.inline_data.data
512          try:
513              raw_template_string = template_bytes.decode("utf-8")
514          except UnicodeDecodeError:
515              return (
516                  current_data,
517                  mime_type,
518                  f"Cannot render non-UTF-8 decodable binary template '{template_filename}' v{template_version}.",
519              )
520  
521      except FileNotFoundError:
522          return (
523              current_data,
524              mime_type,
525              f"Template artifact '{template_filename}' v{template_version_str or 'latest'} not found.",
526          )
527      except ValueError as e:
528          return (
529              current_data,
530              mime_type,
531              f"Invalid version specified for template: '{template_version_str}' or other value error: {e}",
532          )
533      except Exception as e:
534          return (
535              current_data,
536              mime_type,
537              f"Error loading template artifact '{template_filename}' v{template_version_str or 'latest'}: {e}",
538          )
539  
540      try:
541          log.info(
542              "%s [apply_to_template] Preparing render context. Input data type: %s, Original MIME: %s",
543              log_id,
544              type(current_data).__name__,
545              mime_type,
546          )
547          render_context: Dict[str, Any]
548  
549          if isinstance(current_data, list):
550              if mime_type and "csv" in mime_type.lower():
551                  log.info(
552                      "%s [apply_to_template] Input is a list and original MIME is CSV. Structuring context with 'headers' and 'data_rows'.",
553                      log_id,
554                  )
555                  if not current_data:
556                      render_context = {"headers": [], "data_rows": []}
557                  else:
558                      if all(isinstance(item, dict) for item in current_data):
559                          headers = list(current_data[0].keys()) if current_data else []
560                          data_rows = [list(row.values()) for row in current_data]
561                          render_context = {"headers": headers, "data_rows": data_rows}
562                      else:
563                          log.warning(
564                              "%s [apply_to_template] Input is list from CSV, but items are not all dictionaries. Falling back to 'items' context.",
565                              log_id,
566                          )
567                          render_context = {"items": current_data}
568              else:
569                  log.info(
570                      "%s [apply_to_template] Input is a list (from JSON/YAML). Data available under 'items' key.",
571                      log_id,
572                  )
573                  render_context = {"items": current_data}
574          elif isinstance(current_data, dict):
575              render_context = current_data
576              log.info(
577                  "%s [apply_to_template] Input is dict. Keys directly available in template.",
578                  log_id,
579              )
580          elif isinstance(current_data, str):
581              render_context = {"text": current_data}
582              log.info(
583                  "%s [apply_to_template] Input is string. Data available under 'text' key.",
584                  log_id,
585              )
586          else:
587              log.warning(
588                  "%s [apply_to_template] Input is unexpected type %s. Converting to string and placing under 'value' key.",
589                  log_id,
590                  type(current_data).__name__,
591              )
592              render_context = {"value": str(current_data)}
593  
594          log.info(
595              "%s [apply_to_template] Render context keys: %s",
596              log_id,
597              list(render_context.keys()),
598          )
599          if "items" in render_context and isinstance(render_context["items"], list):
600              log.info(
601                  "%s [apply_to_template] Render context 'items' length: %d",
602                  log_id,
603                  len(render_context["items"]),
604              )
605  
606          intermediate_rendered_string = pystache.render(
607              raw_template_string, render_context
608          )
609          log.debug(
610              "%s [apply_to_template] Intermediate rendered string: %s",
611              log_id,
612              intermediate_rendered_string[:200] + "...",
613          )
614  
615      except Exception as e:
616          return (
617              current_data,
618              mime_type,
619              f"Error preparing context or rendering template '{template_filename}' v{template_version}: {e}",
620          )
621  
622      try:
623          log.debug(
624              "%s [apply_to_template] Resolving embeds on rendered template output.",
625              log_id,
626          )
627          resolver_config = context.get("config", {})
628          if not resolver_config:
629              log.warning(
630                  "%s 'config' not found in context for template embed resolution. Using defaults.",
631                  log_id,
632              )
633  
634          final_rendered_string = await resolve_embeds_recursively_in_string(
635              text=intermediate_rendered_string,
636              context=context,
637              resolver_func=evaluate_embed,
638              types_to_resolve=EARLY_EMBED_TYPES.union(LATE_EMBED_TYPES),
639              resolution_mode=ResolutionMode.RECURSIVE_ARTIFACT_CONTENT,
640              log_identifier=f"{log_id}[TemplateEmbeds]",
641              config=resolver_config,
642              max_depth=resolver_config.get("gateway_recursive_embed_depth", 12),
643              current_depth=0,
644              visited_artifacts=set(),
645              accumulated_size=0,
646              max_total_size=resolver_config.get(
647                  "gateway_max_artifact_resolve_size_bytes", -1
648              ),
649          )
650          log.debug(
651              "%s [apply_to_template] Final rendered string after embed resolution: %s",
652              log_id,
653              final_rendered_string[:200] + "...",
654          )
655      except Exception as recurse_err:
656          log.exception(
657              "%s Error during recursive resolution of rendered template: %s",
658              log_id,
659              recurse_err,
660          )
661          return (
662              current_data,
663              mime_type,
664              f"Error resolving embeds within rendered template: {recurse_err}",
665          )
666  
667      return final_rendered_string, mime_type, None
668  
669  
670  MODIFIER_IMPLEMENTATIONS: Dict[
671      str, Callable[..., Tuple[Any, Optional[str], Optional[str]]]
672  ] = {
673      "jsonpath": _apply_jsonpath,
674      "select_cols": _apply_select_cols,
675      "filter_rows_eq": _apply_filter_rows_eq,
676      "slice_rows": _apply_slice_rows,
677      "slice_lines": _apply_slice_lines,
678      "grep": _apply_grep,
679      "head": _apply_head,
680      "tail": _apply_tail,
681      "select_fields": _apply_select_fields,
682      "apply_to_template": _apply_template,
683  }
684  
685  MODIFIER_DEFINITIONS: Dict[str, Dict[str, Any]] = {
686      "jsonpath": {
687          "function": _apply_jsonpath,
688          "accepts": [DataFormat.JSON_OBJECT],
689          "produces": DataFormat.JSON_OBJECT,
690      },
691      "select_cols": {
692          "function": _apply_select_cols,
693          "accepts": [DataFormat.LIST_OF_DICTS],
694          "produces": DataFormat.LIST_OF_DICTS,
695      },
696      "filter_rows_eq": {
697          "function": _apply_filter_rows_eq,
698          "accepts": [DataFormat.LIST_OF_DICTS],
699          "produces": DataFormat.LIST_OF_DICTS,
700      },
701      "slice_rows": {
702          "function": _apply_slice_rows,
703          "accepts": [DataFormat.LIST_OF_DICTS],
704          "produces": DataFormat.LIST_OF_DICTS,
705      },
706      "slice_lines": {
707          "function": _apply_slice_lines,
708          "accepts": [DataFormat.STRING],
709          "produces": DataFormat.STRING,
710      },
711      "grep": {
712          "function": _apply_grep,
713          "accepts": [DataFormat.STRING],
714          "produces": DataFormat.STRING,
715      },
716      "head": {
717          "function": _apply_head,
718          "accepts": [DataFormat.STRING],
719          "produces": DataFormat.STRING,
720      },
721      "tail": {
722          "function": _apply_tail,
723          "accepts": [DataFormat.STRING],
724          "produces": DataFormat.STRING,
725      },
726      "select_fields": {
727          "function": _apply_select_fields,
728          "accepts": [DataFormat.LIST_OF_DICTS],
729          "produces": DataFormat.LIST_OF_DICTS,
730      },
731      "apply_to_template": {
732          "function": _apply_template,
733          "accepts": [
734              DataFormat.JSON_OBJECT,
735              DataFormat.LIST_OF_DICTS,
736              DataFormat.STRING,
737          ],
738          "produces": DataFormat.STRING,
739      },
740  }
741  
742  
743  def _parse_modifier_chain(
744      expression: str,
745  ) -> Tuple[str, List[Tuple[str, str]], Optional[str]]:
746      """
747      Parses the expression part of an artifact_content embed.
748  
749      Separates the artifact specifier, modifier chain, and final format specifier.
750  
751      Args:
752          expression: The full expression string after 'artifact_content:'.
753  
754      Returns:
755          A tuple containing:
756          - artifact_spec (str): The filename and optional version (e.g., "data.csv:1").
757          - modifiers (List[Tuple[str, str]]): A list of (prefix, value) tuples for modifiers.
758          - output_format (Optional[str]): The final output format string (e.g., "text", "json").
759                                            Returns None if the format step is missing or invalid.
760      """
761      from .constants import EMBED_CHAIN_DELIMITER
762  
763      parts = expression.split(EMBED_CHAIN_DELIMITER)
764      if not parts:
765          return expression, [], None
766  
767      artifact_spec = parts[0].strip()
768      modifiers = []
769      output_format = None
770  
771      for i in range(1, len(parts)):
772          part = parts[i].strip()
773          if not part:
774              continue
775  
776          if i == len(parts) - 1:
777              format_match = re.match(r"format:(.*)", part, re.DOTALL)
778              if format_match:
779                  output_format = format_match.group(1).strip()
780                  continue
781  
782          modifier_parts = part.split(":", 1)
783          if len(modifier_parts) == 2:
784              prefix = modifier_parts[0].strip()
785              value = modifier_parts[1].strip()
786              if prefix and value:
787                  modifiers.append((prefix, value))
788              else:
789                  log.warning("Ignoring invalid modifier step format: '%s'", part)
790          else:
791              log.warning("Ignoring invalid modifier step format: '%s'", part)
792  
793      return artifact_spec, modifiers, output_format