/ logextractor.py
logextractor.py
  1  import json
  2  import os
  3  import sys
  4  import traceback
  5  
  6  from concurrent.futures import ProcessPoolExecutor, as_completed
  7  from ctypes import ArgumentError
  8  from dataclasses import asdict, dataclass
  9  from pathlib import Path
 10  from typing import Optional
 11  
 12  from scanf import scanf_compile
 13  
 14  import clang.cindex as ci
 15  from clang.cindex import CompilationDatabase, CompilationDatabaseError, TranslationUnit
 16  
 17  
 18  def print_ast(node: ci.Cursor, indent=0):
 19      prefix = "  " * indent
 20      print(
 21          f"{prefix}{node.kind} : '{node.spelling}' "
 22          f"(Ref: '{node.referenced.spelling if node.referenced else 'None'}')")
 23  
 24      # Recurse
 25      for child in node.get_children():
 26          print_ast(child, indent + 2)
 27  
 28  
 29  def fmt_to_regex(fmt_str):
 30      """
 31      Converts a printf/scanf string to a Python Regex.
 32      Returns the regex string and a list of inferred types, just a wrapper
 33      for the python scanf package's scanf_compile.
 34      """
 35  
 36      return scanf_compile(fmt_str, collapseWhitespace=False)
 37  
 38  
 39  # Global that will be initialized once in each process
 40  _PROCESS_LOCAL_INDEX = None
 41  
 42  
 43  def worker_init():
 44      """
 45      Called once per process to init the global per-process index.
 46      """
 47      global _PROCESS_LOCAL_INDEX
 48      _PROCESS_LOCAL_INDEX = ci.Index.create()
 49  
 50  
 51  def worker_entrypoint(filename: str, args: list[str], root_dir: str):
 52      """
 53      Wrapper that initializes / retrieves the global _PROCESS_LOCAL_INDEX and
 54      invokes parse_file.
 55      """
 56      global _PROCESS_LOCAL_INDEX
 57      if _PROCESS_LOCAL_INDEX is None:
 58          _PROCESS_LOCAL_INDEX = ci.Index.create()
 59  
 60      return parse_file(filename, args, root_dir, _PROCESS_LOCAL_INDEX)
 61  
 62  
 63  # Todo: handle other weird cases!
 64  LOG_FUNCS = {
 65      'LogDebug', 'LogTrace',
 66      'LogPrintf',
 67      'LogInfo', 'LogError', 'LogWarning',
 68      'LogPrintFormatInternal'
 69  }
 70  
 71  
 72  @dataclass(frozen=True, slots=True)
 73  class LogMessage:
 74      fmt: str                # the format string (without surrounding quotes)
 75      regex: str              # regex string for the fmt, generated by scanf package
 76      regex_types: list[str]  # inferred types, because of tfm, don't trust these too much
 77      args: list[str]         # Arguments passed to the print
 78      file: Optional[str]     # source file name (None for built‑ins)
 79      line: int               # line number of the macro call
 80      column: int             # column number of the macro call
 81      macro: str              # which Log* macro was used
 82      category: Optional[str] # optional category argument
 83  
 84  
 85  @dataclass
 86  class ParseResult:
 87      macro: str
 88      args: list[str]
 89      loc: ci.SourceLocation
 90  
 91  
 92  # Parse the tokens of a tu into ParseResults, each of which is passed to
 93  # process_log to create a LogMessage object from.
 94  def parse_tokens(tu: TranslationUnit, root_dir: str) -> list[LogMessage]:
 95      results: list[LogMessage] = []
 96      # skip files outside of our dir
 97      if not tu.spelling.startswith(root_dir):
 98          return []
 99  
100      @dataclass
101      class ParseState:
102          in_log: bool = False
103          seen_open_paren: bool = False
104          current_arg: str = ""
105          paren_depth: int = 0
106  
107      parse_state = ParseState()
108      parse_result = None
109  
110      if tu.cursor or tu.cursor.extent is None:
111          return []
112      for token in tu.get_tokens(extent=tu.cursor.extent):
113          tk = token.spelling
114  
115          if tk in LOG_FUNCS:
116              parse_state.in_log = True
117              parse_result = ParseResult(macro=tk, args=[], loc=token.location)
118              continue
119  
120          if not parse_state.in_log:
121              continue
122  
123          assert parse_result is not None
124  
125          if tk == "(":
126              if not parse_state.seen_open_paren:
127                  parse_state.seen_open_paren = True
128                  continue
129              parse_state.paren_depth += 1
130              parse_state.current_arg += tk
131          elif tk == "," and parse_state.paren_depth == 0:
132              parse_result.args.append(parse_state.current_arg)
133              parse_state.current_arg = ""
134          elif tk == ")":
135              if parse_state.paren_depth == 0:
136                  parse_result.args.append(parse_state.current_arg)
137                  results.append(process_log(parse_result, root_dir))
138                  parse_state = ParseState()
139              else:
140                  parse_state.paren_depth -= 1
141                  parse_state.current_arg += tk
142          else:
143              parse_state.current_arg += tk
144  
145      return results
146  
147  
148  # Turn a ParseResult (macro, args, sourcelocation) into a LogMessage
149  def process_log(parse_result: ParseResult, root_dir: str) -> LogMessage:
150      macro = parse_result.macro
151      args = parse_result.args
152      loc = parse_result.loc
153  
154      category: Optional[str] = None
155  
156      idx = 0
157      if macro in ("LogDebug", "LogTrace"):
158          # First arg is category
159          category = args[idx]
160          idx += 1
161      else:
162          category = "BCLog::ALL"
163  
164      fmt_str = args[idx]
165      if fmt_str.startswith('"') and fmt_str.endswith('"'):
166          fmt_str = fmt_str[1:-1]
167      else:
168          # The format string is not a literal, probably not worth handling this
169          print(f"Format string is not a literal, skipped: `{fmt_str}`")
170  
171      # on second thought, store the fmt strings in the text file,
172      # the log parser can compile to regex's at load time?
173      regex, regex_types = fmt_to_regex(fmt_str)
174      regex = regex.pattern
175      cleaned_types = []
176      for t in regex_types:
177          name = getattr(t, '__name__', str(t))
178          if name == '<lambda>':
179              cleaned_types.append('str')
180          else:
181              cleaned_types.append(name)
182  
183      passed_arguments: list[str] = args[idx+1:]
184  
185      file_name = Path(loc.file.name).relative_to(root_dir)
186  
187      return LogMessage(
188          fmt=fmt_str,
189          regex=regex,
190          regex_types=cleaned_types,
191          args=passed_arguments,
192          file=str(file_name),
193          line=loc.line,
194          column=loc.column,
195          macro=macro,
196          category=category,
197      )
198  
199  
200  def parse_file(
201      filename: str,
202      args: list[str],
203      root_dir: str,
204      index: Optional[ci.Index] = None
205  ) -> list[LogMessage]:
206      """
207      Parse file into LogMessages, optionally reuses an existing index if the
208      caller has one.
209      """
210  
211      if not index:
212          index = ci.Index.create()
213  
214      try:
215          tu = index.parse(
216              str(filename),
217              args=args,
218              options=ci.TranslationUnit.PARSE_DETAILED_PROCESSING_RECORD,
219          )
220      except ci.TranslationUnitLoadError as e:
221          print(f"Error parsing {filename}: {e}")
222          print(f"Using args: {" ".join(args)}")
223          return []
224  
225      assert isinstance(tu.cursor, ci.Cursor)
226  
227      return parse_tokens(tu, root_dir)
228  
229  
230  class LogCompiler:
231      def __init__(self, root_dir):
232          self.root_dir = Path(root_dir)
233          self.build_dir = self.root_dir / "build"
234  
235          compile_commands_path = self.build_dir / "compile_commands.json"
236          if not compile_commands_path.is_file():
237              raise ArgumentError(
238                  f"Expected to find {compile_commands_path.absolute()} but "
239                  f"didn't. Check that {self.build_dir.absolute()} exists, you "
240                  "are building with clang, and that "
241                  "CMAKE_EXPORT_COMPILE_COMMANDS=ON")
242  
243          try:
244              self.cdb = CompilationDatabase.fromDirectory(self.build_dir)
245          except CompilationDatabaseError:
246              raise ArgumentError(
247                  f"Error: something went wrong loading {compile_commands_path}")
248  
249          self.log_messages: list[LogMessage] = []
250  
251      @staticmethod
252      def clean_args(args):
253          arglist = list(args)[1:]
254          clean = []
255          skip_next = False
256  
257          for a in arglist:
258              # stop parsing at -- {filename.cpp}
259              if a == '--':
260                  break
261              elif skip_next:
262                  skip_next = False
263                  continue
264              # These break everything for some reason
265              elif a == '-c':
266                  skip_next = True
267                  continue
268              clean.append(a)
269  
270          # Maybe speeds things up?
271          clean.append('-fsyntax-only')
272          return clean
273  
274      def parse_file(self, filename):
275          print(f"Parsing {filename}...")
276          filename = Path(filename)
277          cmds = self.cdb.getCompileCommands(filename)
278          if cmds is None or len(cmds) <= 0:
279              raise ArgumentError(f"{filename} not found in compilation database!")
280  
281          # We only want the first one if multiple exist.
282          args = LogCompiler.clean_args(cmds[0].arguments)
283  
284          index = ci.Index.create()
285          self.log_messages.extend(parse_file(str(filename), args, str(self.root_dir), index))
286  
287      def parse_all(self):
288          exclude_dirs = [
289              # No debug.log messages live in these places!
290              self.root_dir / "src" / "bench",
291              self.root_dir / "src" / "test",
292              self.root_dir / "src" / "ipc" / "test",
293              self.root_dir / "src" / "qt" / "test",
294              self.root_dir / "src" / "wallet" / "test",
295              # autogenerated stuff
296              self.build_dir,
297          ]
298  
299          exclude_dirs = [p.resolve() for p in exclude_dirs]
300  
301          all_cmds = self.cdb.getAllCompileCommands()
302          if all_cmds is None:
303              raise AssertionError
304  
305          seen: set[Path] = set()
306          tasks = []
307  
308          for cmd in all_cmds:
309              src_path = Path(cmd.filename).resolve()
310  
311              if src_path in seen:
312                  continue
313  
314              seen.add(src_path)
315              if any(src_path.is_relative_to(d) for d in exclude_dirs):
316                  continue
317              if src_path.suffix == ".cpp":
318                  args = self.clean_args(cmd.arguments)
319                  tasks.append((src_path, args, str(self.root_dir)))
320  
321          print(f"Parsing {len(tasks)} files using {os.cpu_count()} threads...")
322  
323          workers = os.cpu_count()
324          with ProcessPoolExecutor(
325              max_workers=workers,
326              initializer=worker_init
327          ) as executor:
328              # Submit all tasks
329              futures = {
330                  executor.submit(worker_entrypoint, f, a, r): f
331                  for f, a, r in tasks
332              }
333  
334              # Watch for completion
335              for i, future in enumerate(as_completed(futures)):
336                  try:
337                      self.log_messages.extend(future.result())
338                      print(f"Progress: {i}/{len(tasks)}", end='\r')
339                  except Exception as e:
340                      print(f"\nTask failed: {e}")
341                      traceback.print_exc()
342  
343      def dump_db(self, out_file):
344          serialisable = [asdict(m) for m in self.log_messages]
345  
346          with open(out_file, 'w') as f:
347              json.dump(serialisable, f, indent=2)
348  
349  
350  if __name__ == "__main__":
351      if len(sys.argv) not in [3, 4]:
352          print(
353              "Usage!!! logextractor.py {src_dir} {out_path}\n"
354              "Or!!! logextractor.py {src_dir} {file} {out_path}\n"
355              )
356          sys.exit(-1)
357  
358      compiler = LogCompiler(sys.argv[1])
359      if len(sys.argv) == 3:
360          compiler.parse_all()
361      elif len(sys.argv) == 4:
362          compiler.parse_file(sys.argv[2])
363      compiler.dump_db(sys.argv[-1])