/ tests / structural / test_architecture.py
test_architecture.py
  1  """
  2  Structural tests that enforce architectural invariants.
  3  
  4  These tests verify module boundaries, file size limits, and naming conventions.
  5  They run on every PR and serve as the "custom linters with teaching error messages"
  6  recommended by the Harness Engineering methodology.
  7  
  8  Error messages are deliberately verbose — they become remediation context
  9  for the agent's next attempt when a test fails.
 10  """
 11  import ast
 12  import os
 13  import pytest
 14  
 15  PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 16  SRC_DIR = os.path.join(PROJECT_ROOT, "src")
 17  TOOLS_DIR = os.path.join(PROJECT_ROOT, "tools")
 18  
 19  
 20  class TestModuleBoundaries:
 21      """Verify that module dependency directions are correct.
 22  
 23      Architecture rule:
 24      - src/api/ may import from src/services/, src/core/, src/config, src/db/
 25      - src/services/ may import from src/core/, src/config, src/db/
 26      - src/core/ may import from src/config, src/db/
 27      - tools/ag3ntum/ may import from src/core/, src/config, src/security/
 28      - NO reverse dependencies (core must not import from api, etc.)
 29      """
 30  
 31      FORBIDDEN_IMPORTS = [
 32          # (importing_module_prefix, forbidden_import_prefix, reason)
 33          ("src.core", "src.api",
 34           "Core must not import from API layer. Core is the foundation; "
 35           "API depends on Core, not the reverse. Move shared code to src/services/ "
 36           "or src/core/."),
 37          ("src.core", "src.services",
 38           "Core must not import from Services. Services orchestrate Core, "
 39           "not the reverse. If Core needs a service capability, define an "
 40           "interface/protocol in Core and implement it in Services."),
 41          ("src.services", "src.api",
 42           "Services must not import from API layer. Services are used BY the API, "
 43           "not the reverse. Move shared types to src/services/ or src/core/."),
 44          ("tools.ag3ntum", "src.api",
 45           "MCP tools must not import from API layer. Tools operate within the "
 46           "agent sandbox; they should only use src/core/ and src/config."),
 47          ("tools.ag3ntum", "src.services",
 48           "MCP tools must not import from Services layer. Tools should use "
 49           "src/core/ utilities directly."),
 50      ]
 51  
 52      # Known violations from pre-existing code — tracked as tech debt.
 53      # These are counted; new violations beyond this count will fail the test.
 54      # Reduce this number as violations are fixed.
 55      KNOWN_VIOLATION_COUNT = 12
 56  
 57      def _get_imports(self, filepath):
 58          """Extract all import statements from a Python file."""
 59          with open(filepath, "r") as f:
 60              try:
 61                  tree = ast.parse(f.read())
 62              except SyntaxError:
 63                  return []
 64  
 65          imports = []
 66          for node in ast.walk(tree):
 67              if isinstance(node, ast.Import):
 68                  for alias in node.names:
 69                      imports.append(alias.name)
 70              elif isinstance(node, ast.ImportFrom):
 71                  if node.module:
 72                      imports.append(node.module)
 73          return imports
 74  
 75      def _module_prefix(self, filepath):
 76          """Convert file path to module prefix."""
 77          rel = os.path.relpath(filepath, PROJECT_ROOT)
 78          return rel.replace(os.sep, ".").replace(".py", "")
 79  
 80      @pytest.mark.unit
 81      def test_no_new_forbidden_imports(self):
 82          """Verify no NEW module imports from a layer it shouldn't depend on.
 83  
 84          Existing violations are tracked as tech debt (KNOWN_VIOLATION_COUNT).
 85          This test fails only if NEW violations are introduced.
 86          """
 87          violations = []
 88  
 89          for scan_dir in [SRC_DIR, TOOLS_DIR]:
 90              for root, _, files in os.walk(scan_dir):
 91                  for f in files:
 92                      if not f.endswith(".py"):
 93                          continue
 94                      filepath = os.path.join(root, f)
 95                      module = self._module_prefix(filepath)
 96                      imports = self._get_imports(filepath)
 97  
 98                      for importing_prefix, forbidden_prefix, reason in self.FORBIDDEN_IMPORTS:
 99                          if not module.startswith(importing_prefix):
100                              continue
101                          for imp in imports:
102                              if imp.startswith(forbidden_prefix):
103                                  violations.append(
104                                      f"\n  VIOLATION: {module}\n"
105                                      f"  imports: {imp}\n"
106                                      f"  Rule: {reason}\n"
107                                  )
108  
109          assert len(violations) <= self.KNOWN_VIOLATION_COUNT, (
110              f"\n{'='*60}\n"
111              f"NEW ARCHITECTURE BOUNDARY VIOLATIONS ({len(violations)} total, "
112              f"{self.KNOWN_VIOLATION_COUNT} known)\n"
113              f"{'='*60}\n"
114              + "\n".join(violations)
115              + f"\n{'='*60}\n"
116              f"Fix: Move shared code to the appropriate layer. See CLAUDE.md "
117              f"'Key Paths' for the layer hierarchy.\n"
118              f"If fixing existing violations, reduce KNOWN_VIOLATION_COUNT "
119              f"in test_architecture.py.\n"
120          )
121  
122  
123  class TestFileSizeLimits:
124      """Enforce file size limits to maintain agent-legible code.
125  
126      Large files are harder for agents to navigate and reason about.
127      The Harness Engineering methodology recommends enforcing file size
128      limits via lints.
129      """
130  
131      MAX_PYTHON_LINES = 600
132      MAX_SHELL_LINES = 2100
133      MAX_TSX_LINES = 500
134      MAX_CSS_LINES = 300
135  
136      WAIVERS = {
137          # Shell scripts
138          "run.sh": 2200,
139          # Scripts
140          "scripts/red-alert.py": 2500,
141          "scripts/claude-code-security-env-check.py": 1500,
142          # Core — large due to complexity, tracked for future splitting
143          "src/core/agent_core.py": 2350,
144          "src/core/path_validator.py": 1800,
145          "src/core/trace_processor.py": 1500,
146          "src/core/permission_config.py": 1300,
147          "src/core/sandbox_path_resolver.py": 1100,
148          "src/core/sessions.py": 900,
149          "src/core/permission_profiles.py": 900,
150          "src/core/hooks.py": 700,
151          "src/core/sandbox.py": 650,
152          "src/core/uid_security.py": 650,
153          "src/core/command_security.py": 1200,
154          # Tracers
155          "src/core/tracers/cli.py": 1600,
156          "src/core/tracers/eventing.py": 900,
157          # API routes
158          "src/api/routes/files.py": 1700,
159          "src/api/routes/sessions.py": 1500,
160          "src/api/routes/reseller.py": 2200,
161          "src/api/routes/admin.py": 1200,
162          "src/api/reseller_models.py": 800,
163          "src/api/security_middleware.py": 700,
164          # Models
165          "src/db/models.py": 750,
166          # Services
167          "src/services/session_service.py": 1100,
168          "src/services/event_service.py": 750,
169          # Tools
170          "tools/ag3ntum/bash_tool.py": 1200,
171          "tools/ag3ntum/read_tool.py": 700,
172          "tools/ag3ntum/edit_tool.py": 700,
173          "tools/ag3ntum/multi_edit_tool.py": 700,
174          # Tests — large test files are acceptable
175          "tests/backend/test_waf_filter.py": 800,
176          "tests/backend/test_ag3ntum_glob_grep_ls.py": 1000,
177          "tests/backend/test_llm_proxy.py": 2200,
178          "tests/backend/test_trace_processor.py": 1000,
179          "tests/backend/test_sessions.py": 1100,
180          "tests/backend/test_sensitive_data_scanner.py": 1000,
181          "tests/backend/test_production_build_mode.py": 800,
182          "tests/backend/test_auth.py": 900,
183          "tests/backend/test_permission_config.py": 1000,
184          "tests/backend/test_zzz_e2e_server.py": 1300,
185          "tests/backend/test_tasks.py": 900,
186          "tests/backend/test_hooks.py": 700,
187          "tests/backend/test_ag3ntum_tools.py": 1800,
188          "tests/backend/test_path_validator.py": 1000,
189          "tests/backend/redis/test_redis_services.py": 700,
190          "tests/backend/redis/test_redis_event_hub.py": 800,
191          "tests/backend/redis/test_zzz_redis_sse_e2e.py": 700,
192          "tests/backend/test_cross_tool_path_consistency.py": 800,
193          "tests/backend/test_real_user_integration.py": 1700,
194          "tests/backend/test_prompt_engine.py": 850,
195          "tests/backend/test_structured_output.py": 650,
196          "tests/backend/test_external_mounts.py": 1000,
197          "tests/backend/test_ag3ntum_write.py": 850,
198          "tests/backend/test_install_user_lifecycle.py": 750,
199          "tests/backend/test_streaming.py": 1100,
200          "tests/backend/test_sse_schemas.py": 1200,
201          "tests/backend/test_files_api.py": 850,
202          "tests/backend/test_queue_system.py": 900,
203          "tests/backend/test_mount_e2e.py": 800,
204          "tests/backend/test_session_service.py": 800,
205          "tests/backend/test_ag3ntum_bash.py": 800,
206          "tests/backend/test_ask_user_question.py": 1250,
207          "tests/backend/test_dynamic_mounts.py": 1700,
208          "tests/backend/test_user_service.py": 900,
209          "tests/backend/test_reseller.py": 800,
210          "tests/backend/test_admin.py": 650,
211          "tests/backend/test_sandbox.py": 1100,
212          "tests/backend/test_sandbox_path_resolver.py": 850,
213          "tests/core-tests/test_prompt_engine.py": 850,
214          "tests/core-tests/test_permission_profiles.py": 650,
215          "tests/core-tests/test_task_runner.py": 650,
216          "tests/security/test_command_security.py": 2000,
217          "tests/security/test_security_prompts.py": 800,
218          "tests/security/test_uid_mode_integration.py": 700,
219          "tests/security/test_uid_security.py": 750,
220          "tests/security/test_sandboxed_envs.py": 850,
221          "tests/security/test_user_isolation.py": 1750,
222          "tests/sandbox/security_tests.py": 1500,
223          # Tools
224          "tools/ag3ntum/ag3ntum_webfetch/tool.py": 750,
225          "tools/ag3ntum/ag3ntum_ssh/tool.py": 2100,  # SSHWrite + batch mode + cleanup/rollback
226          "src/core/ssh/ssh_command_filter.py": 1100,  # 4-tier profile system with shell/path/sudo gating
227          "tests/backend/ssh/test_ssh_tools.py": 800,
228          "tests/backend/ssh/test_ssh_write.py": 970,
229          "tests/backend/ssh/test_ssh_command_filter.py": 2050,  # 270+ tests for 4-tier profile system
230          "tests/backend/ssh/test_ssh_tool_hardening.py": 1100,
231          "tests/backend/test_ssh_profiles_connection.py": 800,
232          # Skills
233          "skills/.claude/skills/create_image/image_gen.py": 800,
234          # Services
235          "src/services/user_service.py": 1200,
236          "src/services/agent_runner.py": 860,  # usage recording + SSH context building
237          "src/services/mount_service.py": 1200,
238          # Core (additional)
239          "src/core/output.py": 700,
240          "src/core/skills.py": 750,
241          # Large test files (comprehensive test suites)
242          "tests/security/test_reseller_security.py": 650,
243          "tests/core-tests/test_agent_core_unit.py": 1150,
244          "tests/backend/test_ag3ntum_read_document.py": 1100,
245          "tests/backend/test_ag3ntum_webfetch.py": 1600,
246          # Structural tests themselves — grows as invariants are added
247          "tests/structural/test_architecture.py": 800,
248      }
249  
250      # Project directories to scan (avoids walking system paths in Docker)
251      PROJECT_DIRS = ["src", "tools", "tests", "skills", "scripts"]
252  
253      @pytest.mark.unit
254      def test_python_file_sizes(self):
255          """No Python file should exceed the line limit."""
256          oversized = []
257          for subdir in self.PROJECT_DIRS:
258              scan_root = os.path.join(PROJECT_ROOT, subdir)
259              if not os.path.isdir(scan_root):
260                  continue
261              for root, _, files in os.walk(scan_root):
262                  if any(s in root for s in ["node_modules", ".git", "__pycache__", ".venv", "venv"]):
263                      continue
264                  for f in files:
265                      if not f.endswith(".py"):
266                          continue
267                      filepath = os.path.join(root, f)
268                      if not os.path.isfile(filepath):
269                          continue
270                      rel = os.path.relpath(filepath, PROJECT_ROOT)
271                      limit = self.WAIVERS.get(rel, self.MAX_PYTHON_LINES)
272                      with open(filepath) as fh:
273                          count = sum(1 for _ in fh)
274                      if count > limit:
275                          oversized.append(
276                              f"  {rel}: {count} lines (limit: {limit})\n"
277                              f"    -> Split into smaller modules or extract helpers."
278                          )
279          assert not oversized, (
280              f"\n{'='*60}\nFILES EXCEEDING SIZE LIMITS ({len(oversized)})\n{'='*60}\n"
281              + "\n".join(oversized) + f"\n{'='*60}\n"
282              f"Fix: Split large files. To add a waiver, update WAIVERS dict.\n"
283          )
284  
285      @pytest.mark.unit
286      def test_shell_script_sizes(self):
287          """Shell scripts should not exceed size limits."""
288          oversized = []
289          # Also scan root-level .sh files (run.sh, entrypoints)
290          scan_dirs = self.PROJECT_DIRS + [""]
291          for subdir in scan_dirs:
292              scan_root = os.path.join(PROJECT_ROOT, subdir) if subdir else PROJECT_ROOT
293              if not os.path.isdir(scan_root):
294                  continue
295              for root, _, files in os.walk(scan_root):
296                  if any(s in root for s in ["node_modules", ".git", "__pycache__", ".venv", "venv"]):
297                      continue
298                  if not subdir and root != PROJECT_ROOT:
299                      continue  # root-level: only direct files
300                  for f in files:
301                      if not f.endswith(".sh"):
302                          continue
303                      filepath = os.path.join(root, f)
304                      rel_path = os.path.relpath(filepath, PROJECT_ROOT)
305                      limit = self.WAIVERS.get(rel_path, self.MAX_SHELL_LINES)
306                      with open(filepath) as fh:
307                          line_count = sum(1 for _ in fh)
308                      if line_count > limit:
309                          oversized.append(
310                              f"  {rel_path}: {line_count} lines (limit: {limit})\n"
311                              f"    -> Consider splitting into sourced scripts."
312                          )
313  
314          assert not oversized, (
315              f"\n{'='*60}\n"
316              f"SHELL SCRIPTS EXCEEDING SIZE LIMITS ({len(oversized)})\n"
317              f"{'='*60}\n"
318              + "\n".join(oversized)
319              + f"\n{'='*60}\n"
320              f"Fix: Split large scripts. To add a waiver, update WAIVERS dict.\n"
321          )
322  
323  
324  class TestNamingConventions:
325      """Enforce consistent naming across the codebase.
326  
327      Consistent naming helps agents navigate the codebase without
328      needing to search for variations.
329      """
330  
331      # Files that are helpers/utilities, not test files — exempt from test_ prefix
332      HELPER_FILES = {
333          "e2e_helpers.py",
334          "security_tests.py",  # sandbox helper, not a pytest test file
335      }
336  
337      @pytest.mark.unit
338      def test_test_files_prefixed(self):
339          """All test files must start with test_ prefix."""
340          violations = []
341          test_dirs = ["tests/backend", "tests/core-tests", "tests/security", "tests/sandbox"]
342          for test_dir in test_dirs:
343              full_dir = os.path.join(PROJECT_ROOT, test_dir)
344              if not os.path.exists(full_dir):
345                  continue
346              for root, _, files in os.walk(full_dir):
347                  for f in files:
348                      if not f.endswith(".py"):
349                          continue
350                      if f.startswith("test_") or f in ("conftest.py", "__init__.py"):
351                          continue
352                      if f in self.HELPER_FILES:
353                          continue
354                      violations.append(
355                          f"  {os.path.relpath(os.path.join(root, f), PROJECT_ROOT)}\n"
356                          f"    -> Rename to test_{f} or add to HELPER_FILES in test_architecture.py"
357                      )
358  
359          assert not violations, (
360              f"\nTest files must start with 'test_' prefix:\n"
361              + "\n".join(violations)
362          )
363  
364      @pytest.mark.unit
365      def test_no_print_in_src(self):
366          """Source code should use logging, not print().
367  
368          print() statements in source code bypass structured logging.
369          Use the logging module or the tracer system instead.
370          Exceptions: CLI scripts in scripts/ and __main__ blocks.
371          """
372          violations = []
373          for root, _, files in os.walk(SRC_DIR):
374              if "web_terminal_client" in root:
375                  continue
376              for f in files:
377                  if not f.endswith(".py"):
378                      continue
379                  filepath = os.path.join(root, f)
380                  with open(filepath) as fh:
381                      for i, line in enumerate(fh, 1):
382                          stripped = line.strip()
383                          # Skip comments and strings
384                          if stripped.startswith("#") or stripped.startswith('"') or stripped.startswith("'"):
385                              continue
386                          if "print(" in stripped and "# noqa: print-ok" not in stripped:
387                              violations.append(
388                                  f"  {os.path.relpath(filepath, PROJECT_ROOT)}:{i}: {stripped[:80]}\n"
389                                  f"    -> Use logging.info/debug/warning() or tracer instead of print(). "
390                                  f"Add '# noqa: print-ok' if intentional."
391                              )
392  
393          # Allow some prints — this is informational, not blocking
394          if len(violations) > 20:
395              pytest.skip(f"Too many print() usages ({len(violations)}) — fix incrementally")
396  
397  
398  class TestPromptTemplateIntegrity:
399      """Verify prompt templates are internally consistent.
400  
401      Catches broken includes and undefined variables before runtime.
402      """
403  
404      PROMPTS_DIR = os.path.join(PROJECT_ROOT, "prompts")
405  
406      @pytest.mark.unit
407      def test_include_targets_exist(self):
408          """All {% include 'file' %} targets must exist."""
409          import re
410          include_pattern = re.compile(r'\{%\s*include\s+["\']([^"\']+)["\']\s*%\}')
411          missing = []
412  
413          if not os.path.exists(self.PROMPTS_DIR):
414              pytest.skip("prompts/ directory not found")
415  
416          for root, _, files in os.walk(self.PROMPTS_DIR):
417              for f in files:
418                  if not f.endswith(".md"):
419                      continue
420                  filepath = os.path.join(root, f)
421                  with open(filepath) as fh:
422                      content = fh.read()
423                  for match in include_pattern.finditer(content):
424                      include_target = match.group(1)
425                      # Resolve relative to prompts dir
426                      target_path = os.path.join(self.PROMPTS_DIR, include_target)
427                      if not os.path.exists(target_path):
428                          rel = os.path.relpath(filepath, PROJECT_ROOT)
429                          missing.append(
430                              f"  {rel}: includes '{include_target}' — FILE NOT FOUND\n"
431                              f"    -> Create the file or fix the include path."
432                          )
433  
434          assert not missing, (
435              f"\nBroken prompt includes found:\n"
436              + "\n".join(missing)
437          )
438  
439  
440  class TestCoverageMapping:
441      """Verify that key source modules have corresponding test files.
442  
443      This ensures new modules don't get added without corresponding tests.
444      """
445  
446      @pytest.mark.unit
447      def test_route_files_have_tests(self):
448          """Every src/api/routes/*.py (except __init__.py) should have a test file.
449  
450          Route files define API endpoints. Each should have at least one
451          corresponding test file in tests/backend/.
452          """
453          routes_dir = os.path.join(SRC_DIR, "api", "routes")
454          tests_dir = os.path.join(PROJECT_ROOT, "tests", "backend")
455  
456          if not os.path.exists(routes_dir):
457              pytest.skip("src/api/routes/ not found")
458  
459          route_files = [
460              f for f in os.listdir(routes_dir)
461              if f.endswith(".py") and not f.startswith("_")
462          ]
463  
464          # Get all test files
465          test_files = set()
466          if os.path.exists(tests_dir):
467              test_files = {
468                  f for f in os.listdir(tests_dir)
469                  if f.startswith("test_") and f.endswith(".py")
470              }
471  
472          missing = []
473          for route_file in route_files:
474              route_name = route_file.replace(".py", "")
475              # Look for any test file that contains the route name
476              has_test = any(
477                  route_name in tf for tf in test_files
478              )
479              if not has_test:
480                  missing.append(
481                      f"  src/api/routes/{route_file} -> no test file "
482                      f"matching '*{route_name}*' in tests/backend/\n"
483                      f"    -> Create tests/backend/test_{route_name}.py"
484                  )
485  
486          assert not missing, (
487              f"\n{'='*60}\n"
488              f"ROUTE FILES WITHOUT TESTS ({len(missing)})\n"
489              f"{'='*60}\n"
490              + "\n".join(missing)
491              + f"\n{'='*60}\n"
492              f"Fix: Create test files for untested routes.\n"
493          )
494  
495      @pytest.mark.unit
496      def test_tool_modules_have_tests(self):
497          """Every tools/ag3ntum/ag3ntum_* directory should have a test.
498  
499          Tool modules are critical for agent functionality. Each should
500          have at least one test file in the test suite.
501          """
502          tools_base = os.path.join(TOOLS_DIR, "ag3ntum")
503          tests_dir = os.path.join(PROJECT_ROOT, "tests", "backend")
504  
505          if not os.path.exists(tools_base):
506              pytest.skip("tools/ag3ntum/ not found")
507  
508          tool_dirs = [
509              d for d in os.listdir(tools_base)
510              if d.startswith("ag3ntum_") and os.path.isdir(os.path.join(tools_base, d))
511          ]
512  
513          # Get all test files across all test directories (recursive)
514          test_files = set()
515          for test_dir_name in ["tests/backend", "tests/security", "tests/core-tests"]:
516              test_dir = os.path.join(PROJECT_ROOT, test_dir_name)
517              if os.path.exists(test_dir):
518                  for root, _, files in os.walk(test_dir):
519                      test_files.update(
520                          f for f in files
521                          if f.startswith("test_") and f.endswith(".py")
522                      )
523  
524          missing = []
525          for tool_dir in tool_dirs:
526              # Extract tool name: ag3ntum_bash -> bash, ag3ntum_read -> read
527              tool_name = tool_dir.replace("ag3ntum_", "")
528              # Look for test files that reference this tool
529              has_test = any(
530                  tool_name in tf for tf in test_files
531              )
532              if not has_test:
533                  missing.append(
534                      f"  tools/ag3ntum/{tool_dir}/ -> no test file "
535                      f"matching '*{tool_name}*'\n"
536                      f"    -> Create a test file covering {tool_dir}"
537                  )
538  
539          assert not missing, (
540              f"\n{'='*60}\n"
541              f"TOOL MODULES WITHOUT TESTS ({len(missing)})\n"
542              f"{'='*60}\n"
543              + "\n".join(missing)
544              + f"\n{'='*60}\n"
545              f"Fix: Create test files for untested tools.\n"
546          )
547  
548  
549  class TestCISecurityInvariants:
550      """Verify CI pipeline security invariants.
551  
552      These tests prevent regression of security hardening measures
553      in the GitHub Actions workflows and Docker build configuration.
554      """
555  
556      WORKFLOWS_DIR = os.path.join(PROJECT_ROOT, ".github", "workflows")
557      DOCKERIGNORE = os.path.join(PROJECT_ROOT, ".dockerignore")
558  
559      @pytest.mark.unit
560      def test_secrets_yaml_excluded_from_docker_build(self):
561          """config/secrets.yaml must be in .dockerignore.
562  
563          The CI pipeline writes API keys to config/secrets.yaml before
564          docker build. Without .dockerignore exclusion, COPY . / bakes
565          the key into image layers. (EXT-23)
566          """
567          assert os.path.exists(self.DOCKERIGNORE), (
568              f".dockerignore not found at {self.DOCKERIGNORE}"
569          )
570          with open(self.DOCKERIGNORE) as f:
571              content = f.read()
572  
573          assert "config/secrets.yaml" in content, (
574              f"\n{'='*60}\n"
575              f"SECURITY: config/secrets.yaml NOT in .dockerignore\n"
576              f"{'='*60}\n"
577              f"CI writes API keys to config/secrets.yaml before docker build.\n"
578              f"Without .dockerignore exclusion, COPY . / bakes the secret\n"
579              f"into Docker image layers.\n"
580              f"\n"
581              f"Fix: Add 'config/secrets.yaml' to .dockerignore\n"
582              f"{'='*60}\n"
583          )
584  
585      @pytest.mark.unit
586      def test_ci_workflows_clean_secrets_file(self):
587          """CI workflows that create secrets.yaml must clean it up.
588  
589          Self-hosted runner workspaces persist between runs. Any workflow
590          that writes config/secrets.yaml must have an if:always() step
591          to remove it, preventing secrets from lingering on disk.
592          """
593          if not os.path.isdir(self.WORKFLOWS_DIR):
594              pytest.skip(".github/workflows/ not found")
595  
596          violations = []
597          for f in sorted(os.listdir(self.WORKFLOWS_DIR)):
598              if not f.endswith((".yml", ".yaml")):
599                  continue
600              filepath = os.path.join(self.WORKFLOWS_DIR, f)
601              with open(filepath) as fh:
602                  content = fh.read()
603  
604              if "config/secrets.yaml" in content and "echo " in content:
605                  # This workflow creates secrets.yaml — verify cleanup
606                  if "rm -f config/secrets.yaml" not in content:
607                      violations.append(
608                          f"  .github/workflows/{f}: creates config/secrets.yaml "
609                          f"but has no 'rm -f config/secrets.yaml' cleanup step\n"
610                          f"    -> Add: - name: Remove secrets file\\n"
611                          f"             if: always()\\n"
612                          f"             run: rm -f config/secrets.yaml"
613                      )
614  
615          assert not violations, (
616              f"\n{'='*60}\n"
617              f"SECURITY: CI workflows missing secrets cleanup ({len(violations)})\n"
618              f"{'='*60}\n"
619              + "\n".join(violations)
620              + f"\n{'='*60}\n"
621              f"Fix: Add an if:always() step to remove config/secrets.yaml.\n"
622              f"{'='*60}\n"
623          )
624  
625  
626  class TestCodeConventions:
627      """Verify code conventions are followed across the codebase."""
628  
629      @pytest.mark.unit
630      def test_impl_functions_exist_for_tools(self):
631          """Tool modules should have _impl function pattern for testability.
632  
633          Per CLAUDE.md gotcha #19: MCP tools have extracted _impl functions
634          for testing. This verifies tool modules follow the convention.
635  
636          Tools without _impl functions are harder to unit test because
637          the MCP wrapper adds complexity (session context, etc.).
638          """
639          tools_base = os.path.join(TOOLS_DIR, "ag3ntum")
640  
641          if not os.path.exists(tools_base):
642              pytest.skip("tools/ag3ntum/ not found")
643  
644          tool_dirs = [
645              d for d in os.listdir(tools_base)
646              if d.startswith("ag3ntum_") and os.path.isdir(os.path.join(tools_base, d))
647          ]
648  
649          # These tools are exempt from the _impl requirement
650          # (they may have different patterns or be simple wrappers)
651          EXEMPT_TOOLS = {
652              "ag3ntum_ask",          # Simple user interaction, no complex impl
653              "ag3ntum_webfetch",     # Delegates to external fetch logic
654              "ag3ntum_read_document",  # Has its own module structure
655              "ag3ntum_multiedit",    # Composes edit operations
656          }
657  
658          missing = []
659          for tool_dir in tool_dirs:
660              if tool_dir in EXEMPT_TOOLS:
661                  continue
662  
663              tool_py = os.path.join(tools_base, tool_dir, "tool.py")
664              if not os.path.exists(tool_py):
665                  continue
666  
667              with open(tool_py) as f:
668                  content = f.read()
669  
670              if "_impl(" not in content:
671                  missing.append(
672                      f"  tools/ag3ntum/{tool_dir}/tool.py -> no _impl function\n"
673                      f"    -> Extract core logic to an async _<name>_impl() function "
674                      f"for testability"
675                  )
676  
677          assert not missing, (
678              f"\n{'='*60}\n"
679              f"TOOLS WITHOUT _impl FUNCTIONS ({len(missing)})\n"
680              f"{'='*60}\n"
681              + "\n".join(missing)
682              + f"\n{'='*60}\n"
683              f"Fix: Extract tool logic to _*_impl() functions.\n"
684              f"See CLAUDE.md gotcha #19.\n"
685          )
686  
687  
688  class TestPermissionsYamlSSHTools:
689      """
690      Verify permissions.yaml includes SSH tools.
691  
692      Regression guard: SSH tools must be listed in both tools.enabled and
693      session_workspace.allow so the SDK permission system does not block them.
694      Without this, SSH tool calls fail with "permission denied" even when the
695      user has SSH enabled via feature flags.
696      """
697  
698      PERMISSIONS_PATH = os.path.join(
699          PROJECT_ROOT, "config", "security", "permissions.yaml"
700      )
701  
702      SSH_TOOL_NAMES = [
703          "mcp__ag3ntum__SSHConnect",
704          "mcp__ag3ntum__SSHExec",
705          "mcp__ag3ntum__SSHRead",
706      ]
707  
708      @pytest.fixture(autouse=True)
709      def _load_permissions(self):
710          """Load and parse permissions.yaml once per test."""
711          import yaml
712          with open(self.PERMISSIONS_PATH) as f:
713              self._config = yaml.safe_load(f)
714  
715      @pytest.mark.unit
716      def test_ssh_tools_in_enabled_list(self):
717          """SSH tools must be in tools.enabled so they're available to the agent."""
718          enabled = self._config.get("tools", {}).get("enabled", [])
719  
720          missing = [t for t in self.SSH_TOOL_NAMES if t not in enabled]
721          assert not missing, (
722              f"\n{'='*60}\n"
723              f"SSH TOOLS MISSING FROM permissions.yaml tools.enabled\n"
724              f"{'='*60}\n"
725              f"Missing: {missing}\n"
726              f"\n"
727              f"Without these entries, SSH tools are not registered as available\n"
728              f"tools and the SDK will deny all SSH operations.\n"
729              f"\n"
730              f"Fix: Add the missing tools to config/security/permissions.yaml\n"
731              f"under tools.enabled.\n"
732              f"{'='*60}\n"
733          )
734  
735      @pytest.mark.unit
736      def test_ssh_tools_not_in_disabled_list(self):
737          """SSH tools must NOT be in tools.disabled."""
738          disabled = self._config.get("tools", {}).get("disabled", [])
739  
740          blocked = [t for t in self.SSH_TOOL_NAMES if t in disabled]
741          assert not blocked, (
742              f"\n{'='*60}\n"
743              f"SSH TOOLS FOUND IN permissions.yaml tools.disabled\n"
744              f"{'='*60}\n"
745              f"Blocked: {blocked}\n"
746              f"\n"
747              f"SSH tools have their own security layers (command filter,\n"
748              f"host blocking, rate limiting). They must not be disabled here.\n"
749              f"Per-user access is controlled by feature flags, not by disabling.\n"
750              f"\n"
751              f"Fix: Remove these tools from tools.disabled in\n"
752              f"config/security/permissions.yaml.\n"
753              f"{'='*60}\n"
754          )
755  
756      @pytest.mark.unit
757      def test_ssh_tools_in_allow_patterns(self):
758          """SSH tools must have allow patterns in session_workspace.allow."""
759          allow = self._config.get("session_workspace", {}).get("allow", [])
760  
761          expected_patterns = [f"{t}(*)" for t in self.SSH_TOOL_NAMES]
762          missing = [p for p in expected_patterns if p not in allow]
763          assert not missing, (
764              f"\n{'='*60}\n"
765              f"SSH TOOL PATTERNS MISSING FROM session_workspace.allow\n"
766              f"{'='*60}\n"
767              f"Missing: {missing}\n"
768              f"\n"
769              f"Without allow patterns, the permission callback denies SSH\n"
770              f"tool calls even when the tools are registered as available.\n"
771              f"\n"
772              f"Fix: Add the missing patterns to session_workspace.allow in\n"
773              f"config/security/permissions.yaml.\n"
774              f"{'='*60}\n"
775          )