validator.py
 1  """
 2  Schema validation utilities for workflow node input/output.
 3  """
 4  
 5  import logging
 6  from typing import Any, Dict, List, Optional
 7  import jsonschema
 8  from jsonschema import ValidationError
 9  
10  log = logging.getLogger(__name__)
11  
12  
13  def validate_against_schema(
14      data: Any, schema: Dict[str, Any]
15  ) -> Optional[List[str]]:
16      """
17      Validate data against a JSON schema.
18      Returns a list of error messages if validation fails, or None if valid.
19      """
20      try:
21          jsonschema.validate(instance=data, schema=schema)
22          return None
23      except ValidationError as e:
24          # Extract a user-friendly error message
25          path = ".".join([str(p) for p in e.path]) if e.path else "root"
26          error_msg = f"Validation error at '{path}': {e.message}"
27          return [error_msg]
28      except Exception as e:
29          return [f"Schema validation failed with unexpected error: {str(e)}"]