validator.py
1 """ 2 Schema validation utilities for workflow node input/output. 3 """ 4 5 import logging 6 from typing import Any, Dict, List, Optional 7 import jsonschema 8 from jsonschema import ValidationError 9 10 log = logging.getLogger(__name__) 11 12 13 def validate_against_schema( 14 data: Any, schema: Dict[str, Any] 15 ) -> Optional[List[str]]: 16 """ 17 Validate data against a JSON schema. 18 Returns a list of error messages if validation fails, or None if valid. 19 """ 20 try: 21 jsonschema.validate(instance=data, schema=schema) 22 return None 23 except ValidationError as e: 24 # Extract a user-friendly error message 25 path = ".".join([str(p) for p in e.path]) if e.path else "root" 26 error_msg = f"Validation error at '{path}': {e.message}" 27 return [error_msg] 28 except Exception as e: 29 return [f"Schema validation failed with unexpected error: {str(e)}"]