test_sql_injection.py
1 """Tests that verify SQL injection mitigations in insights and state modules.""" 2 3 import re 4 5 from agent.insights import InsightsEngine 6 7 8 def test_session_cols_no_injection_chars(): 9 """_SESSION_COLS must not contain SQL injection vectors.""" 10 cols = InsightsEngine._SESSION_COLS 11 assert ";" not in cols 12 assert "--" not in cols 13 assert "'" not in cols 14 assert "DROP" not in cols.upper() 15 16 17 def test_get_sessions_all_query_is_parameterized(): 18 """_GET_SESSIONS_ALL must use a ? placeholder for the cutoff value.""" 19 query = InsightsEngine._GET_SESSIONS_ALL 20 assert "?" in query 21 assert "started_at >= ?" in query 22 # Must not embed any runtime-variable content via brace interpolation 23 assert "{" not in query 24 25 26 def test_get_sessions_with_source_query_is_parameterized(): 27 """_GET_SESSIONS_WITH_SOURCE must use ? placeholders for both parameters.""" 28 query = InsightsEngine._GET_SESSIONS_WITH_SOURCE 29 assert query.count("?") == 2 30 assert "started_at >= ?" in query 31 assert "source = ?" in query 32 assert "{" not in query 33 34 35 def test_session_col_names_are_safe_identifiers(): 36 """Every column name listed in _SESSION_COLS must be a simple identifier.""" 37 cols = InsightsEngine._SESSION_COLS 38 identifiers = [c.strip() for c in cols.split(",")] 39 safe_identifier = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$") 40 for col in identifiers: 41 assert safe_identifier.match(col), ( 42 f"Column name {col!r} is not a safe SQL identifier" 43 )