modifiers.py
1 """ 2 Defines modifier implementation functions and their contracts. 3 """ 4 5 import logging 6 import re 7 from typing import Any, Callable, Dict, Optional, Tuple, List 8 9 from .constants import EARLY_EMBED_TYPES, LATE_EMBED_TYPES 10 11 log = logging.getLogger(__name__) 12 13 from jsonpath_ng.ext import parse as jsonpath_parse 14 import pystache 15 16 from google.adk.artifacts import BaseArtifactService 17 18 from .types import DataFormat, ResolutionMode 19 20 21 def _apply_jsonpath( 22 current_data: Any, expression: str, mime_type: Optional[str], log_id: str 23 ) -> Tuple[Any, Optional[str], Optional[str]]: 24 """ 25 Applies a JSONPath expression to parsed JSON data. 26 27 Args: 28 current_data: The input data (expected to be dict or list). 29 expression: The JSONPath expression string. 30 mime_type: The original mime type (passed through). 31 log_id: Identifier for logging. 32 33 Returns: 34 Tuple: (result_data, original_mime_type, error_string) 35 result_data is typically a list of matched values. 36 """ 37 if not isinstance(current_data, (dict, list)): 38 return ( 39 current_data, 40 mime_type, 41 f"Input data for 'jsonpath' must be a JSON object or list, got {type(current_data).__name__}.", 42 ) 43 44 try: 45 jsonpath_expr = jsonpath_parse(expression) 46 matches = [match.value for match in jsonpath_expr.find(current_data)] 47 return matches, mime_type, None 48 except Exception as e: 49 return ( 50 current_data, 51 mime_type, 52 f"Error applying JSONPath expression '{expression}': {e}", 53 ) 54 55 56 def _apply_select_cols( 57 current_data: List[Dict], cols_str: str, mime_type: Optional[str], log_id: str 58 ) -> Tuple[Any, Optional[str], Optional[str]]: 59 """ 60 Selects specific columns from data represented as a list of dictionaries. 61 62 Args: 63 current_data: The input data (expected List[Dict]). 64 cols_str: Comma-separated string of column names to keep. 65 mime_type: The original mime type (passed through). 66 log_id: Identifier for logging. 67 68 Returns: 69 Tuple: (result_data, original_mime_type, error_string) 70 result_data is List[Dict] containing only selected columns. 71 """ 72 if not isinstance(current_data, list) or ( 73 current_data and not isinstance(current_data[0], dict) 74 ): 75 return ( 76 current_data, 77 mime_type, 78 f"Input data for 'select_cols' must be a list of dictionaries, got {type(current_data).__name__}.", 79 ) 80 81 if not current_data: 82 return [], mime_type, None 83 84 try: 85 header = list(current_data[0].keys()) 86 target_cols = [col.strip() for col in cols_str.split(",")] 87 output_list = [] 88 89 for target_col in target_cols: 90 if target_col not in header: 91 return ( 92 current_data, 93 mime_type, 94 f"Column '{target_col}' not found in data keys: {header}", 95 ) 96 97 for row_dict in current_data: 98 new_row = {col: row_dict.get(col) for col in target_cols} 99 output_list.append(new_row) 100 101 return output_list, mime_type, None 102 103 except Exception as e: 104 return current_data, mime_type, f"Error selecting columns '{cols_str}': {e}" 105 106 107 def _apply_filter_rows_eq( 108 current_data: List[Dict], filter_spec: str, mime_type: Optional[str], log_id: str 109 ) -> Tuple[Any, Optional[str], Optional[str]]: 110 """ 111 Filters a list of dictionaries based on a column's value equality. 112 113 Args: 114 current_data: The input data (expected List[Dict]). 115 filter_spec: String in the format 'column_name:value'. 116 mime_type: The original mime type (passed through). 117 log_id: Identifier for logging. 118 119 Returns: 120 Tuple: (result_data, original_mime_type, error_string) 121 result_data is List[Dict] containing only filtered rows. 122 """ 123 if not isinstance(current_data, list) or ( 124 current_data and not isinstance(current_data[0], dict) 125 ): 126 return ( 127 current_data, 128 mime_type, 129 f"Input data for 'filter_rows_eq' must be a list of dictionaries, got {type(current_data).__name__}.", 130 ) 131 132 if not current_data: 133 return [], mime_type, None 134 135 try: 136 parts = filter_spec.split(":", 1) 137 if len(parts) != 2: 138 return ( 139 current_data, 140 mime_type, 141 f"Invalid filter format '{filter_spec}'. Expected 'column_name:value'.", 142 ) 143 col_name, filter_value = parts[0].strip(), parts[1].strip() 144 145 header = list(current_data[0].keys()) 146 if col_name not in header: 147 return ( 148 current_data, 149 mime_type, 150 f"Filter column '{col_name}' not found in data keys: {header}", 151 ) 152 153 output_list = [ 154 row for row in current_data if str(row.get(col_name)) == filter_value 155 ] 156 157 return output_list, mime_type, None 158 159 except Exception as e: 160 return current_data, mime_type, f"Error filtering rows by '{filter_spec}': {e}" 161 162 163 def _apply_slice_rows( 164 current_data: List[Dict], slice_spec: str, mime_type: Optional[str], log_id: str 165 ) -> Tuple[Any, Optional[str], Optional[str]]: 166 """ 167 Selects a slice of rows from a list of dictionaries. 168 169 Args: 170 current_data: The input data (expected List[Dict]). 171 slice_spec: String in Python slice format 'start:end'. 172 mime_type: The original mime type (passed through). 173 log_id: Identifier for logging. 174 175 Returns: 176 Tuple: (result_data, original_mime_type, error_string) 177 result_data is List[Dict] containing the sliced rows. 178 """ 179 if not isinstance(current_data, list): 180 return ( 181 current_data, 182 mime_type, 183 f"Input data for 'slice_rows' must be a list, got {type(current_data).__name__}.", 184 ) 185 186 try: 187 start_str, end_str = None, None 188 if ":" in slice_spec: 189 parts = slice_spec.split(":", 1) 190 start_str, end_str = parts[0].strip(), parts[1].strip() 191 else: 192 return ( 193 current_data, 194 mime_type, 195 f"Invalid slice format '{slice_spec}'. Expected 'start:end'.", 196 ) 197 198 start = int(start_str) if start_str else 0 199 end = int(end_str) if end_str else None 200 201 sliced_data = current_data[start:end] 202 203 return sliced_data, mime_type, None 204 205 except (ValueError, TypeError) as e: 206 return current_data, mime_type, f"Invalid slice indices in '{slice_spec}': {e}" 207 except Exception as e: 208 return current_data, mime_type, f"Error slicing rows '{slice_spec}': {e}" 209 210 211 def _apply_slice_lines( 212 current_data: str, slice_spec: str, mime_type: Optional[str], log_id: str 213 ) -> Tuple[Any, Optional[str], Optional[str]]: 214 """ 215 Selects a slice of lines from text data. 216 217 Args: 218 current_data: The input data (expected str). 219 slice_spec: String in Python slice format 'start:end'. 220 mime_type: The original mime type (passed through). 221 log_id: Identifier for logging. 222 223 Returns: 224 Tuple: (result_data, original_mime_type, error_string) 225 result_data is str containing the sliced lines. 226 """ 227 if not isinstance(current_data, str): 228 return ( 229 current_data, 230 mime_type, 231 f"Input data for 'slice_lines' must be a string, got {type(current_data).__name__}.", 232 ) 233 234 try: 235 start_str, end_str = None, None 236 if ":" in slice_spec: 237 parts = slice_spec.split(":", 1) 238 start_str, end_str = parts[0].strip(), parts[1].strip() 239 else: 240 return ( 241 current_data, 242 mime_type, 243 f"Invalid slice format '{slice_spec}'. Expected 'start:end'.", 244 ) 245 246 start = int(start_str) if start_str else 0 247 end = int(end_str) if end_str else None 248 249 lines = current_data.splitlines(keepends=True) 250 sliced_lines = lines[start:end] 251 252 return "".join(sliced_lines), mime_type, None 253 254 except (ValueError, TypeError) as e: 255 return current_data, mime_type, f"Invalid slice indices in '{slice_spec}': {e}" 256 except Exception as e: 257 return current_data, mime_type, f"Error slicing text lines '{slice_spec}': {e}" 258 259 260 def _apply_grep( 261 current_data: str, pattern: str, mime_type: Optional[str], log_id: str 262 ) -> Tuple[Any, Optional[str], Optional[str]]: 263 """ 264 Filters lines matching a regex pattern from text data. 265 266 Args: 267 current_data: The input data (expected str). 268 pattern: The regex pattern string. 269 mime_type: The original mime type (passed through). 270 log_id: Identifier for logging. 271 272 Returns: 273 Tuple: (result_data, original_mime_type, error_string) 274 result_data is str containing only matching lines. 275 """ 276 if not isinstance(current_data, str): 277 return ( 278 current_data, 279 mime_type, 280 f"Input data for 'grep' must be a string, got {type(current_data).__name__}.", 281 ) 282 283 try: 284 regex = re.compile(pattern) 285 lines = current_data.splitlines(keepends=True) 286 filtered_lines = [line for line in lines if regex.search(line)] 287 return "".join(filtered_lines), mime_type, None 288 except re.error as e: 289 return current_data, mime_type, f"Invalid regex pattern '{pattern}': {e}" 290 except Exception as e: 291 return current_data, mime_type, f"Error applying grep pattern '{pattern}': {e}" 292 293 294 def _apply_head( 295 current_data: str, n_str: str, mime_type: Optional[str], log_id: str 296 ) -> Tuple[Any, Optional[str], Optional[str]]: 297 """ 298 Returns the first N lines of text data. 299 300 Args: 301 current_data: The input data (expected str). 302 n_str: String representing the number of lines (N). 303 mime_type: The original mime type (passed through). 304 log_id: Identifier for logging. 305 306 Returns: 307 Tuple: (result_data, original_mime_type, error_string) 308 result_data is str containing the first N lines. 309 """ 310 if not isinstance(current_data, str): 311 return ( 312 current_data, 313 mime_type, 314 f"Input data for 'head' must be a string, got {type(current_data).__name__}.", 315 ) 316 317 try: 318 n = int(n_str.strip()) 319 if n < 0: 320 return current_data, mime_type, "Head count N cannot be negative." 321 322 lines = current_data.splitlines(keepends=True) 323 head_lines = lines[:n] 324 return "".join(head_lines), mime_type, None 325 except (ValueError, TypeError) as e: 326 return current_data, mime_type, f"Invalid head count N '{n_str}': {e}" 327 except Exception as e: 328 return current_data, mime_type, f"Error applying head '{n_str}': {e}" 329 330 331 def _apply_tail( 332 current_data: str, n_str: str, mime_type: Optional[str], log_id: str 333 ) -> Tuple[Any, Optional[str], Optional[str]]: 334 """ 335 Returns the last N lines of text data. 336 337 Args: 338 current_data: The input data (expected str). 339 n_str: String representing the number of lines (N). 340 mime_type: The original mime type (passed through). 341 log_id: Identifier for logging. 342 343 Returns: 344 Tuple: (result_data, original_mime_type, error_string) 345 result_data is str containing the last N lines. 346 """ 347 if not isinstance(current_data, str): 348 return ( 349 current_data, 350 mime_type, 351 f"Input data for 'tail' must be a string, got {type(current_data).__name__}.", 352 ) 353 354 try: 355 n = int(n_str.strip()) 356 if n < 0: 357 return current_data, mime_type, "Tail count N cannot be negative." 358 if n == 0: 359 return "", mime_type, None 360 361 lines = current_data.splitlines(keepends=True) 362 tail_lines = lines[-n:] 363 return "".join(tail_lines), mime_type, None 364 except (ValueError, TypeError) as e: 365 return current_data, mime_type, f"Invalid tail count N '{n_str}': {e}" 366 except Exception as e: 367 return current_data, mime_type, f"Error applying tail '{n_str}': {e}" 368 369 370 def _apply_select_fields( 371 current_data: List[Dict], fields_str: str, mime_type: Optional[str], log_id: str 372 ) -> Tuple[Any, Optional[str], Optional[str]]: 373 """ 374 Selects specific fields from a list of dictionaries. 375 376 Args: 377 current_data: The input data (expected List[Dict]). 378 fields_str: Comma-separated string of field names to keep. 379 mime_type: The original mime type (passed through). 380 log_id: Identifier for logging. 381 382 Returns: 383 Tuple: (result_data, original_mime_type, error_string) 384 result_data is List[Dict] containing only selected fields. 385 """ 386 if not isinstance(current_data, list) or ( 387 current_data and not isinstance(current_data[0], dict) 388 ): 389 return ( 390 current_data, 391 mime_type, 392 f"Input data for 'select_fields' must be a list of dictionaries, got {type(current_data).__name__}.", 393 ) 394 395 target_fields = [field.strip() for field in fields_str.split(",")] 396 if not target_fields: 397 return current_data, mime_type, "No fields specified for 'select_fields'." 398 399 output_list = [] 400 try: 401 for item in current_data: 402 if isinstance(item, dict): 403 new_item = { 404 field: item.get(field) for field in target_fields if field in item 405 } 406 output_list.append(new_item) 407 else: 408 log.warning( 409 "%s Skipping non-dictionary item in list during select_fields.", 410 log_id, 411 ) 412 continue 413 return output_list, mime_type, None 414 except Exception as e: 415 return current_data, mime_type, f"Error selecting fields '{fields_str}': {e}" 416 417 418 async def _apply_template( 419 current_data: Any, 420 template_spec: str, 421 mime_type: Optional[str], 422 log_id: str, 423 context: Any, 424 ) -> Tuple[Any, Optional[str], Optional[str]]: 425 """ 426 Applies a Mustache template loaded from an artifact to the input data. 427 This version first renders the template, then resolves embeds on the result. 428 429 Args: 430 current_data: The input data (expected dict, list, or str). 431 template_spec: String 'template_filename[:version]'. 432 mime_type: The original mime type (passed through). 433 log_id: Identifier for logging. 434 context: The Gateway context dictionary containing artifact_service and session_context. 435 436 Returns: 437 Tuple: (result_data, original_mime_type, error_string) 438 result_data is the rendered and resolved string. 439 """ 440 from .resolver import resolve_embeds_recursively_in_string, evaluate_embed 441 442 if not isinstance(current_data, (dict, list, str)): 443 return ( 444 current_data, 445 mime_type, 446 f"Input data for 'apply_to_template' must be dict, list, or string, got {type(current_data).__name__}.", 447 ) 448 449 parts = template_spec.strip().split(":", 1) 450 template_filename = parts[0] 451 template_version_str = parts[1] if len(parts) > 1 else None 452 template_version = None 453 454 if not template_filename: 455 return current_data, mime_type, "Template filename cannot be empty." 456 457 if not isinstance(context, dict): 458 return current_data, mime_type, "Invalid context for template loading." 459 artifact_service: Optional[BaseArtifactService] = context.get("artifact_service") 460 session_context = context.get("session_context") 461 if not artifact_service or not session_context: 462 return ( 463 current_data, 464 mime_type, 465 "ArtifactService or session context not available for template loading.", 466 ) 467 468 app_name = session_context.get("app_name") 469 user_id = session_context.get("user_id") 470 session_id = session_context.get("session_id") 471 if not all([app_name, user_id, session_id]): 472 return ( 473 current_data, 474 mime_type, 475 "Missing required session identifiers in context for template loading.", 476 ) 477 478 try: 479 if template_version_str: 480 template_version = int(template_version_str) 481 else: 482 versions = await artifact_service.list_versions( 483 app_name=app_name, 484 user_id=user_id, 485 session_id=session_id, 486 filename=template_filename, 487 ) 488 if not versions: 489 return ( 490 current_data, 491 mime_type, 492 f"Template artifact '{template_filename}' (latest) not found.", 493 ) 494 template_version = max(versions) 495 496 template_part = await artifact_service.load_artifact( 497 app_name=app_name, 498 user_id=user_id, 499 session_id=session_id, 500 filename=template_filename, 501 version=template_version, 502 ) 503 504 if not template_part or not template_part.inline_data: 505 return ( 506 current_data, 507 mime_type, 508 f"Template artifact '{template_filename}' v{template_version} not found or empty.", 509 ) 510 511 template_bytes = template_part.inline_data.data 512 try: 513 raw_template_string = template_bytes.decode("utf-8") 514 except UnicodeDecodeError: 515 return ( 516 current_data, 517 mime_type, 518 f"Cannot render non-UTF-8 decodable binary template '{template_filename}' v{template_version}.", 519 ) 520 521 except FileNotFoundError: 522 return ( 523 current_data, 524 mime_type, 525 f"Template artifact '{template_filename}' v{template_version_str or 'latest'} not found.", 526 ) 527 except ValueError as e: 528 return ( 529 current_data, 530 mime_type, 531 f"Invalid version specified for template: '{template_version_str}' or other value error: {e}", 532 ) 533 except Exception as e: 534 return ( 535 current_data, 536 mime_type, 537 f"Error loading template artifact '{template_filename}' v{template_version_str or 'latest'}: {e}", 538 ) 539 540 try: 541 log.info( 542 "%s [apply_to_template] Preparing render context. Input data type: %s, Original MIME: %s", 543 log_id, 544 type(current_data).__name__, 545 mime_type, 546 ) 547 render_context: Dict[str, Any] 548 549 if isinstance(current_data, list): 550 if mime_type and "csv" in mime_type.lower(): 551 log.info( 552 "%s [apply_to_template] Input is a list and original MIME is CSV. Structuring context with 'headers' and 'data_rows'.", 553 log_id, 554 ) 555 if not current_data: 556 render_context = {"headers": [], "data_rows": []} 557 else: 558 if all(isinstance(item, dict) for item in current_data): 559 headers = list(current_data[0].keys()) if current_data else [] 560 data_rows = [list(row.values()) for row in current_data] 561 render_context = {"headers": headers, "data_rows": data_rows} 562 else: 563 log.warning( 564 "%s [apply_to_template] Input is list from CSV, but items are not all dictionaries. Falling back to 'items' context.", 565 log_id, 566 ) 567 render_context = {"items": current_data} 568 else: 569 log.info( 570 "%s [apply_to_template] Input is a list (from JSON/YAML). Data available under 'items' key.", 571 log_id, 572 ) 573 render_context = {"items": current_data} 574 elif isinstance(current_data, dict): 575 render_context = current_data 576 log.info( 577 "%s [apply_to_template] Input is dict. Keys directly available in template.", 578 log_id, 579 ) 580 elif isinstance(current_data, str): 581 render_context = {"text": current_data} 582 log.info( 583 "%s [apply_to_template] Input is string. Data available under 'text' key.", 584 log_id, 585 ) 586 else: 587 log.warning( 588 "%s [apply_to_template] Input is unexpected type %s. Converting to string and placing under 'value' key.", 589 log_id, 590 type(current_data).__name__, 591 ) 592 render_context = {"value": str(current_data)} 593 594 log.info( 595 "%s [apply_to_template] Render context keys: %s", 596 log_id, 597 list(render_context.keys()), 598 ) 599 if "items" in render_context and isinstance(render_context["items"], list): 600 log.info( 601 "%s [apply_to_template] Render context 'items' length: %d", 602 log_id, 603 len(render_context["items"]), 604 ) 605 606 intermediate_rendered_string = pystache.render( 607 raw_template_string, render_context 608 ) 609 log.debug( 610 "%s [apply_to_template] Intermediate rendered string: %s", 611 log_id, 612 intermediate_rendered_string[:200] + "...", 613 ) 614 615 except Exception as e: 616 return ( 617 current_data, 618 mime_type, 619 f"Error preparing context or rendering template '{template_filename}' v{template_version}: {e}", 620 ) 621 622 try: 623 log.debug( 624 "%s [apply_to_template] Resolving embeds on rendered template output.", 625 log_id, 626 ) 627 resolver_config = context.get("config", {}) 628 if not resolver_config: 629 log.warning( 630 "%s 'config' not found in context for template embed resolution. Using defaults.", 631 log_id, 632 ) 633 634 final_rendered_string = await resolve_embeds_recursively_in_string( 635 text=intermediate_rendered_string, 636 context=context, 637 resolver_func=evaluate_embed, 638 types_to_resolve=EARLY_EMBED_TYPES.union(LATE_EMBED_TYPES), 639 resolution_mode=ResolutionMode.RECURSIVE_ARTIFACT_CONTENT, 640 log_identifier=f"{log_id}[TemplateEmbeds]", 641 config=resolver_config, 642 max_depth=resolver_config.get("gateway_recursive_embed_depth", 12), 643 current_depth=0, 644 visited_artifacts=set(), 645 accumulated_size=0, 646 max_total_size=resolver_config.get( 647 "gateway_max_artifact_resolve_size_bytes", -1 648 ), 649 ) 650 log.debug( 651 "%s [apply_to_template] Final rendered string after embed resolution: %s", 652 log_id, 653 final_rendered_string[:200] + "...", 654 ) 655 except Exception as recurse_err: 656 log.exception( 657 "%s Error during recursive resolution of rendered template: %s", 658 log_id, 659 recurse_err, 660 ) 661 return ( 662 current_data, 663 mime_type, 664 f"Error resolving embeds within rendered template: {recurse_err}", 665 ) 666 667 return final_rendered_string, mime_type, None 668 669 670 MODIFIER_IMPLEMENTATIONS: Dict[ 671 str, Callable[..., Tuple[Any, Optional[str], Optional[str]]] 672 ] = { 673 "jsonpath": _apply_jsonpath, 674 "select_cols": _apply_select_cols, 675 "filter_rows_eq": _apply_filter_rows_eq, 676 "slice_rows": _apply_slice_rows, 677 "slice_lines": _apply_slice_lines, 678 "grep": _apply_grep, 679 "head": _apply_head, 680 "tail": _apply_tail, 681 "select_fields": _apply_select_fields, 682 "apply_to_template": _apply_template, 683 } 684 685 MODIFIER_DEFINITIONS: Dict[str, Dict[str, Any]] = { 686 "jsonpath": { 687 "function": _apply_jsonpath, 688 "accepts": [DataFormat.JSON_OBJECT], 689 "produces": DataFormat.JSON_OBJECT, 690 }, 691 "select_cols": { 692 "function": _apply_select_cols, 693 "accepts": [DataFormat.LIST_OF_DICTS], 694 "produces": DataFormat.LIST_OF_DICTS, 695 }, 696 "filter_rows_eq": { 697 "function": _apply_filter_rows_eq, 698 "accepts": [DataFormat.LIST_OF_DICTS], 699 "produces": DataFormat.LIST_OF_DICTS, 700 }, 701 "slice_rows": { 702 "function": _apply_slice_rows, 703 "accepts": [DataFormat.LIST_OF_DICTS], 704 "produces": DataFormat.LIST_OF_DICTS, 705 }, 706 "slice_lines": { 707 "function": _apply_slice_lines, 708 "accepts": [DataFormat.STRING], 709 "produces": DataFormat.STRING, 710 }, 711 "grep": { 712 "function": _apply_grep, 713 "accepts": [DataFormat.STRING], 714 "produces": DataFormat.STRING, 715 }, 716 "head": { 717 "function": _apply_head, 718 "accepts": [DataFormat.STRING], 719 "produces": DataFormat.STRING, 720 }, 721 "tail": { 722 "function": _apply_tail, 723 "accepts": [DataFormat.STRING], 724 "produces": DataFormat.STRING, 725 }, 726 "select_fields": { 727 "function": _apply_select_fields, 728 "accepts": [DataFormat.LIST_OF_DICTS], 729 "produces": DataFormat.LIST_OF_DICTS, 730 }, 731 "apply_to_template": { 732 "function": _apply_template, 733 "accepts": [ 734 DataFormat.JSON_OBJECT, 735 DataFormat.LIST_OF_DICTS, 736 DataFormat.STRING, 737 ], 738 "produces": DataFormat.STRING, 739 }, 740 } 741 742 743 def _parse_modifier_chain( 744 expression: str, 745 ) -> Tuple[str, List[Tuple[str, str]], Optional[str]]: 746 """ 747 Parses the expression part of an artifact_content embed. 748 749 Separates the artifact specifier, modifier chain, and final format specifier. 750 751 Args: 752 expression: The full expression string after 'artifact_content:'. 753 754 Returns: 755 A tuple containing: 756 - artifact_spec (str): The filename and optional version (e.g., "data.csv:1"). 757 - modifiers (List[Tuple[str, str]]): A list of (prefix, value) tuples for modifiers. 758 - output_format (Optional[str]): The final output format string (e.g., "text", "json"). 759 Returns None if the format step is missing or invalid. 760 """ 761 from .constants import EMBED_CHAIN_DELIMITER 762 763 parts = expression.split(EMBED_CHAIN_DELIMITER) 764 if not parts: 765 return expression, [], None 766 767 artifact_spec = parts[0].strip() 768 modifiers = [] 769 output_format = None 770 771 for i in range(1, len(parts)): 772 part = parts[i].strip() 773 if not part: 774 continue 775 776 if i == len(parts) - 1: 777 format_match = re.match(r"format:(.*)", part, re.DOTALL) 778 if format_match: 779 output_format = format_match.group(1).strip() 780 continue 781 782 modifier_parts = part.split(":", 1) 783 if len(modifier_parts) == 2: 784 prefix = modifier_parts[0].strip() 785 value = modifier_parts[1].strip() 786 if prefix and value: 787 modifiers.append((prefix, value)) 788 else: 789 log.warning("Ignoring invalid modifier step format: '%s'", part) 790 else: 791 log.warning("Ignoring invalid modifier step format: '%s'", part) 792 793 return artifact_spec, modifiers, output_format