/ agent-scan / utils / aig_logger.py
aig_logger.py
  1  # Copyright (c) 2024-2026 Tencent Zhuque Lab. All rights reserved.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #     http://www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  #
 15  # Requirement: Any integration or derivative work must explicitly attribute
 16  # Tencent Zhuque Lab (https://github.com/Tencent/AI-Infra-Guard) in its
 17  # documentation or user interface, as detailed in the NOTICE file.
 18  
 19  """
 20  Agent scan logger module.
 21  
 22  Provides structured logging for the agent security scanning pipeline,
 23  including step progress, tool usage, and result reporting.
 24  """
 25  
 26  import json
 27  import time
 28  from typing import Literal
 29  import logging
 30  from pydantic import BaseModel
 31  
 32  
 33  class ContentSchema(BaseModel):
 34      """Base schema for log content with timestamp."""
 35      timestamp: str = str(time.time())
 36  
 37  
 38  class NewPlanStep(ContentSchema):
 39      """Log entry for a new pipeline step."""
 40      stepId: str
 41      title: str
 42  
 43  
 44  class StatusUpdate(ContentSchema):
 45      """Log entry for step status update."""
 46      stepId: str
 47      brief: str
 48      description: str
 49      status: Literal["running", "completed", "failed"]
 50  
 51  
 52  class ToolUsed(ContentSchema):
 53      """Log entry for tool usage."""
 54      stepId: str
 55      tool_id: str
 56      tool_name: str | None = None
 57      brief: str
 58      status: Literal["todo", "doing", "done"]
 59      params: str
 60  
 61  
 62  class ActionLog(ContentSchema):
 63      """Log entry for tool action."""
 64      tool_id: str
 65      tool_name: str
 66      stepId: str
 67      log: str
 68  
 69  
 70  class ErrorLog(ContentSchema):
 71      """Log entry for errors."""
 72      msg: str
 73  
 74  
 75  class AgentMsg(BaseModel):
 76      """Structured agent message for logging."""
 77      type: str
 78      content: dict
 79  
 80  
 81  class ScanLogger:
 82      """
 83      Logger for agent security scanning pipeline.
 84      
 85      Provides structured JSON logging for:
 86      - Pipeline step progress
 87      - Tool usage tracking
 88      - Action logs
 89      - Result reporting
 90      - Error logging
 91      """
 92  
 93      def __init__(self):
 94          logger = logging.getLogger("scanLogger")
 95          logger.setLevel(logging.INFO)
 96          
 97          # Create console handler
 98          console_handler = logging.StreamHandler()
 99          console_handler.setLevel(logging.INFO)
100          
101          # Set log format
102          formatter = logging.Formatter("%(message)s")
103          console_handler.setFormatter(formatter)
104          logger.addHandler(console_handler)
105          self.logger = logger
106  
107      def _log(self, type: str, content: BaseModel | dict):
108          """Internal logging method."""
109          if isinstance(content, BaseModel):
110              content.timestamp = str(time.time())
111              content = content.model_dump()
112          self.logger.info(AgentMsg(type=type, content=content).model_dump_json())
113  
114      def new_plan_step(self, stepId: str, stepName: str):
115          """Log a new pipeline step."""
116          self._log("newPlanStep", NewPlanStep(stepId=stepId, title=stepName))
117  
118      def status_update(
119          self,
120          stepId: str,
121          brief: str,
122          description: str,
123          status: Literal["running", "completed", "failed"]
124      ):
125          """Log a step status update."""
126          self._log("statusUpdate", StatusUpdate(
127              stepId=stepId, brief=brief, description=description, status=status
128          ))
129  
130      def tool_used(
131          self,
132          stepId: str,
133          tool_id: str,
134          brief: str,
135          status: Literal["todo", "doing", "done"],
136          tool_name: str = None,
137          params: str = ""
138      ):
139          """Log tool usage."""
140          self._log("toolUsed", ToolUsed(
141              stepId=stepId, tool_id=tool_id, tool_name=tool_name,
142              brief=brief, status=status, params=params
143          ))
144  
145      def action_log(self, tool_id: str, tool_name: str, stepId: str, log: str):
146          """Log a tool action."""
147          self._log("actionLog", ActionLog(
148              tool_id=tool_id, tool_name=tool_name, stepId=stepId, log=log
149          ))
150  
151      def result_update(self, content: dict):
152          """Log scan result."""
153          self._log("resultUpdate", content)
154  
155      def error_log(self, msg: str):
156          """Log an error."""
157          self._log("error", ErrorLog(msg=msg))
158  
159  
160  # Global logger instance
161  scanLogger = ScanLogger()
162  
163  
164  
165  if __name__ == '__main__':
166      scanLogger.new_plan_step(stepId="0", stepName="Step 1")
167      scanLogger.new_plan_step(stepId="1", stepName="Step 2")
168      scanLogger.new_plan_step(stepId="2", stepName="Step 3")