/ tests / test_sql_injection.py
test_sql_injection.py
 1  """Tests that verify SQL injection mitigations in insights and state modules."""
 2  
 3  import re
 4  
 5  from agent.insights import InsightsEngine
 6  
 7  
 8  def test_session_cols_no_injection_chars():
 9      """_SESSION_COLS must not contain SQL injection vectors."""
10      cols = InsightsEngine._SESSION_COLS
11      assert ";" not in cols
12      assert "--" not in cols
13      assert "'" not in cols
14      assert "DROP" not in cols.upper()
15  
16  
17  def test_get_sessions_all_query_is_parameterized():
18      """_GET_SESSIONS_ALL must use a ? placeholder for the cutoff value."""
19      query = InsightsEngine._GET_SESSIONS_ALL
20      assert "?" in query
21      assert "started_at >= ?" in query
22      # Must not embed any runtime-variable content via brace interpolation
23      assert "{" not in query
24  
25  
26  def test_get_sessions_with_source_query_is_parameterized():
27      """_GET_SESSIONS_WITH_SOURCE must use ? placeholders for both parameters."""
28      query = InsightsEngine._GET_SESSIONS_WITH_SOURCE
29      assert query.count("?") == 2
30      assert "started_at >= ?" in query
31      assert "source = ?" in query
32      assert "{" not in query
33  
34  
35  def test_session_col_names_are_safe_identifiers():
36      """Every column name listed in _SESSION_COLS must be a simple identifier."""
37      cols = InsightsEngine._SESSION_COLS
38      identifiers = [c.strip() for c in cols.split(",")]
39      safe_identifier = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
40      for col in identifiers:
41          assert safe_identifier.match(col), (
42              f"Column name {col!r} is not a safe SQL identifier"
43          )