/ restai / llms / tools / data_parser.py
data_parser.py
  1  from typing import Optional
  2  
  3  
  4  def data_parser(
  5      data: str,
  6      format: str = "json",
  7      query: Optional[str] = None,
  8  ) -> str:
  9      """
 10      Parse and query JSON or CSV data. Extract specific fields, filter rows, or summarize structured data.
 11  
 12      Args:
 13          data (str): The raw JSON or CSV string to parse.
 14          format (str): Data format — "json" or "csv". Default: "json".
 15          query (Optional[str]): Dot-path query for JSON (e.g. "users.0.name", "items.*.price") or column name for CSV (e.g. "name" to extract that column, "name=John" to filter rows where name equals John).
 16      """
 17      import json
 18      import csv
 19      from io import StringIO
 20  
 21      try:
 22          if format == "json":
 23              return _query_json(json.loads(data), query)
 24          elif format == "csv":
 25              return _query_csv(data, query)
 26          else:
 27              return f"Error: Unknown format '{format}'. Use 'json' or 'csv'."
 28      except json.JSONDecodeError as e:
 29          return f"Error parsing JSON: {e}"
 30      except Exception as e:
 31          return f"Error: {e}"
 32  
 33  
 34  def _query_json(data, query):
 35      import json
 36  
 37      if not query:
 38          if isinstance(data, list):
 39              return f"{len(data)} items. Keys: {list(data[0].keys()) if data and isinstance(data[0], dict) else 'N/A'}"
 40          elif isinstance(data, dict):
 41              return f"Object with keys: {list(data.keys())}"
 42          return json.dumps(data, indent=2, default=str)
 43  
 44      parts = query.split(".")
 45      current = data
 46      for part in parts:
 47          if part == "*":
 48              if isinstance(current, list):
 49                  remaining = ".".join(parts[parts.index(part) + 1:])
 50                  if remaining:
 51                      results = []
 52                      for item in current:
 53                          r = _query_json(item, remaining)
 54                          results.append(r)
 55                      return "\n".join(results)
 56                  return json.dumps(current, indent=2, default=str)
 57              return f"Error: '*' only works on arrays, got {type(current).__name__}"
 58          elif part.isdigit():
 59              idx = int(part)
 60              if isinstance(current, list) and idx < len(current):
 61                  current = current[idx]
 62              else:
 63                  return f"Error: Index {idx} out of range"
 64          elif isinstance(current, dict) and part in current:
 65              current = current[part]
 66          elif isinstance(current, dict):
 67              return f"Error: Key '{part}' not found. Available: {list(current.keys())}"
 68          else:
 69              return f"Error: Cannot access '{part}' on {type(current).__name__}"
 70  
 71      if isinstance(current, (dict, list)):
 72          return json.dumps(current, indent=2, default=str)
 73      return str(current)
 74  
 75  
 76  def _query_csv(data, query):
 77      import csv
 78      from io import StringIO
 79  
 80      reader = csv.DictReader(StringIO(data))
 81      rows = list(reader)
 82  
 83      if not rows:
 84          return "Empty CSV (no rows)"
 85  
 86      if not query:
 87          return f"{len(rows)} rows. Columns: {list(rows[0].keys())}"
 88  
 89      # Filter: column=value
 90      if "=" in query:
 91          col, val = query.split("=", 1)
 92          col = col.strip()
 93          val = val.strip()
 94          if col not in rows[0]:
 95              return f"Error: Column '{col}' not found. Available: {list(rows[0].keys())}"
 96          filtered = [r for r in rows if r.get(col, "").strip() == val]
 97          if not filtered:
 98              return f"No rows where {col} = {val}"
 99          lines = [",".join(filtered[0].keys())]
100          for r in filtered:
101              lines.append(",".join(r.values()))
102          return "\n".join(lines)
103  
104      # Extract column
105      col = query.strip()
106      if col not in rows[0]:
107          return f"Error: Column '{col}' not found. Available: {list(rows[0].keys())}"
108      values = [r[col] for r in rows]
109      return "\n".join(values)