data_parser.py
1 from typing import Optional 2 3 4 def data_parser( 5 data: str, 6 format: str = "json", 7 query: Optional[str] = None, 8 ) -> str: 9 """ 10 Parse and query JSON or CSV data. Extract specific fields, filter rows, or summarize structured data. 11 12 Args: 13 data (str): The raw JSON or CSV string to parse. 14 format (str): Data format — "json" or "csv". Default: "json". 15 query (Optional[str]): Dot-path query for JSON (e.g. "users.0.name", "items.*.price") or column name for CSV (e.g. "name" to extract that column, "name=John" to filter rows where name equals John). 16 """ 17 import json 18 import csv 19 from io import StringIO 20 21 try: 22 if format == "json": 23 return _query_json(json.loads(data), query) 24 elif format == "csv": 25 return _query_csv(data, query) 26 else: 27 return f"Error: Unknown format '{format}'. Use 'json' or 'csv'." 28 except json.JSONDecodeError as e: 29 return f"Error parsing JSON: {e}" 30 except Exception as e: 31 return f"Error: {e}" 32 33 34 def _query_json(data, query): 35 import json 36 37 if not query: 38 if isinstance(data, list): 39 return f"{len(data)} items. Keys: {list(data[0].keys()) if data and isinstance(data[0], dict) else 'N/A'}" 40 elif isinstance(data, dict): 41 return f"Object with keys: {list(data.keys())}" 42 return json.dumps(data, indent=2, default=str) 43 44 parts = query.split(".") 45 current = data 46 for part in parts: 47 if part == "*": 48 if isinstance(current, list): 49 remaining = ".".join(parts[parts.index(part) + 1:]) 50 if remaining: 51 results = [] 52 for item in current: 53 r = _query_json(item, remaining) 54 results.append(r) 55 return "\n".join(results) 56 return json.dumps(current, indent=2, default=str) 57 return f"Error: '*' only works on arrays, got {type(current).__name__}" 58 elif part.isdigit(): 59 idx = int(part) 60 if isinstance(current, list) and idx < len(current): 61 current = current[idx] 62 else: 63 return f"Error: Index {idx} out of range" 64 elif isinstance(current, dict) and part in current: 65 current = current[part] 66 elif isinstance(current, dict): 67 return f"Error: Key '{part}' not found. Available: {list(current.keys())}" 68 else: 69 return f"Error: Cannot access '{part}' on {type(current).__name__}" 70 71 if isinstance(current, (dict, list)): 72 return json.dumps(current, indent=2, default=str) 73 return str(current) 74 75 76 def _query_csv(data, query): 77 import csv 78 from io import StringIO 79 80 reader = csv.DictReader(StringIO(data)) 81 rows = list(reader) 82 83 if not rows: 84 return "Empty CSV (no rows)" 85 86 if not query: 87 return f"{len(rows)} rows. Columns: {list(rows[0].keys())}" 88 89 # Filter: column=value 90 if "=" in query: 91 col, val = query.split("=", 1) 92 col = col.strip() 93 val = val.strip() 94 if col not in rows[0]: 95 return f"Error: Column '{col}' not found. Available: {list(rows[0].keys())}" 96 filtered = [r for r in rows if r.get(col, "").strip() == val] 97 if not filtered: 98 return f"No rows where {col} = {val}" 99 lines = [",".join(filtered[0].keys())] 100 for r in filtered: 101 lines.append(",".join(r.values())) 102 return "\n".join(lines) 103 104 # Extract column 105 col = query.strip() 106 if col not in rows[0]: 107 return f"Error: Column '{col}' not found. Available: {list(rows[0].keys())}" 108 values = [r[col] for r in rows] 109 return "\n".join(values)