/ logextractor.py
logextractor.py
1 import json 2 import os 3 import sys 4 import traceback 5 6 from concurrent.futures import ProcessPoolExecutor, as_completed 7 from ctypes import ArgumentError 8 from dataclasses import asdict, dataclass 9 from pathlib import Path 10 from typing import Optional 11 12 from scanf import scanf_compile 13 14 import clang.cindex as ci 15 from clang.cindex import CompilationDatabase, CompilationDatabaseError, TranslationUnit 16 17 18 def print_ast(node: ci.Cursor, indent=0): 19 prefix = " " * indent 20 print( 21 f"{prefix}{node.kind} : '{node.spelling}' " 22 f"(Ref: '{node.referenced.spelling if node.referenced else 'None'}')") 23 24 # Recurse 25 for child in node.get_children(): 26 print_ast(child, indent + 2) 27 28 29 def fmt_to_regex(fmt_str): 30 """ 31 Converts a printf/scanf string to a Python Regex. 32 Returns the regex string and a list of inferred types, just a wrapper 33 for the python scanf package's scanf_compile. 34 """ 35 36 return scanf_compile(fmt_str, collapseWhitespace=False) 37 38 39 # Global that will be initialized once in each process 40 _PROCESS_LOCAL_INDEX = None 41 42 43 def worker_init(): 44 """ 45 Called once per process to init the global per-process index. 46 """ 47 global _PROCESS_LOCAL_INDEX 48 _PROCESS_LOCAL_INDEX = ci.Index.create() 49 50 51 def worker_entrypoint(filename: str, args: list[str], root_dir: str): 52 """ 53 Wrapper that initializes / retrieves the global _PROCESS_LOCAL_INDEX and 54 invokes parse_file. 55 """ 56 global _PROCESS_LOCAL_INDEX 57 if _PROCESS_LOCAL_INDEX is None: 58 _PROCESS_LOCAL_INDEX = ci.Index.create() 59 60 return parse_file(filename, args, root_dir, _PROCESS_LOCAL_INDEX) 61 62 63 # Todo: handle other weird cases! 64 LOG_FUNCS = { 65 'LogDebug', 'LogTrace', 66 'LogPrintf', 67 'LogInfo', 'LogError', 'LogWarning', 68 'LogPrintFormatInternal' 69 } 70 71 72 @dataclass(frozen=True, slots=True) 73 class LogMessage: 74 fmt: str # the format string (without surrounding quotes) 75 regex: str # regex string for the fmt, generated by scanf package 76 regex_types: list[str] # inferred types, because of tfm, don't trust these too much 77 args: list[str] # Arguments passed to the print 78 file: Optional[str] # source file name (None for built‑ins) 79 line: int # line number of the macro call 80 column: int # column number of the macro call 81 macro: str # which Log* macro was used 82 category: Optional[str] # optional category argument 83 84 85 @dataclass 86 class ParseResult: 87 macro: str 88 args: list[str] 89 loc: ci.SourceLocation 90 91 92 # Parse the tokens of a tu into ParseResults, each of which is passed to 93 # process_log to create a LogMessage object from. 94 def parse_tokens(tu: TranslationUnit, root_dir: str) -> list[LogMessage]: 95 results: list[LogMessage] = [] 96 # skip files outside of our dir 97 if not tu.spelling.startswith(root_dir): 98 return [] 99 100 @dataclass 101 class ParseState: 102 in_log: bool = False 103 seen_open_paren: bool = False 104 current_arg: str = "" 105 paren_depth: int = 0 106 107 parse_state = ParseState() 108 parse_result = None 109 110 if tu.cursor or tu.cursor.extent is None: 111 return [] 112 for token in tu.get_tokens(extent=tu.cursor.extent): 113 tk = token.spelling 114 115 if tk in LOG_FUNCS: 116 parse_state.in_log = True 117 parse_result = ParseResult(macro=tk, args=[], loc=token.location) 118 continue 119 120 if not parse_state.in_log: 121 continue 122 123 assert parse_result is not None 124 125 if tk == "(": 126 if not parse_state.seen_open_paren: 127 parse_state.seen_open_paren = True 128 continue 129 parse_state.paren_depth += 1 130 parse_state.current_arg += tk 131 elif tk == "," and parse_state.paren_depth == 0: 132 parse_result.args.append(parse_state.current_arg) 133 parse_state.current_arg = "" 134 elif tk == ")": 135 if parse_state.paren_depth == 0: 136 parse_result.args.append(parse_state.current_arg) 137 results.append(process_log(parse_result, root_dir)) 138 parse_state = ParseState() 139 else: 140 parse_state.paren_depth -= 1 141 parse_state.current_arg += tk 142 else: 143 parse_state.current_arg += tk 144 145 return results 146 147 148 # Turn a ParseResult (macro, args, sourcelocation) into a LogMessage 149 def process_log(parse_result: ParseResult, root_dir: str) -> LogMessage: 150 macro = parse_result.macro 151 args = parse_result.args 152 loc = parse_result.loc 153 154 category: Optional[str] = None 155 156 idx = 0 157 if macro in ("LogDebug", "LogTrace"): 158 # First arg is category 159 category = args[idx] 160 idx += 1 161 else: 162 category = "BCLog::ALL" 163 164 fmt_str = args[idx] 165 if fmt_str.startswith('"') and fmt_str.endswith('"'): 166 fmt_str = fmt_str[1:-1] 167 else: 168 # The format string is not a literal, probably not worth handling this 169 print(f"Format string is not a literal, skipped: `{fmt_str}`") 170 171 # on second thought, store the fmt strings in the text file, 172 # the log parser can compile to regex's at load time? 173 regex, regex_types = fmt_to_regex(fmt_str) 174 regex = regex.pattern 175 cleaned_types = [] 176 for t in regex_types: 177 name = getattr(t, '__name__', str(t)) 178 if name == '<lambda>': 179 cleaned_types.append('str') 180 else: 181 cleaned_types.append(name) 182 183 passed_arguments: list[str] = args[idx+1:] 184 185 file_name = Path(loc.file.name).relative_to(root_dir) 186 187 return LogMessage( 188 fmt=fmt_str, 189 regex=regex, 190 regex_types=cleaned_types, 191 args=passed_arguments, 192 file=str(file_name), 193 line=loc.line, 194 column=loc.column, 195 macro=macro, 196 category=category, 197 ) 198 199 200 def parse_file( 201 filename: str, 202 args: list[str], 203 root_dir: str, 204 index: Optional[ci.Index] = None 205 ) -> list[LogMessage]: 206 """ 207 Parse file into LogMessages, optionally reuses an existing index if the 208 caller has one. 209 """ 210 211 if not index: 212 index = ci.Index.create() 213 214 try: 215 tu = index.parse( 216 str(filename), 217 args=args, 218 options=ci.TranslationUnit.PARSE_DETAILED_PROCESSING_RECORD, 219 ) 220 except ci.TranslationUnitLoadError as e: 221 print(f"Error parsing {filename}: {e}") 222 print(f"Using args: {" ".join(args)}") 223 return [] 224 225 assert isinstance(tu.cursor, ci.Cursor) 226 227 return parse_tokens(tu, root_dir) 228 229 230 class LogCompiler: 231 def __init__(self, root_dir): 232 self.root_dir = Path(root_dir) 233 self.build_dir = self.root_dir / "build" 234 235 compile_commands_path = self.build_dir / "compile_commands.json" 236 if not compile_commands_path.is_file(): 237 raise ArgumentError( 238 f"Expected to find {compile_commands_path.absolute()} but " 239 f"didn't. Check that {self.build_dir.absolute()} exists, you " 240 "are building with clang, and that " 241 "CMAKE_EXPORT_COMPILE_COMMANDS=ON") 242 243 try: 244 self.cdb = CompilationDatabase.fromDirectory(self.build_dir) 245 except CompilationDatabaseError: 246 raise ArgumentError( 247 f"Error: something went wrong loading {compile_commands_path}") 248 249 self.log_messages: list[LogMessage] = [] 250 251 @staticmethod 252 def clean_args(args): 253 arglist = list(args)[1:] 254 clean = [] 255 skip_next = False 256 257 for a in arglist: 258 # stop parsing at -- {filename.cpp} 259 if a == '--': 260 break 261 elif skip_next: 262 skip_next = False 263 continue 264 # These break everything for some reason 265 elif a == '-c': 266 skip_next = True 267 continue 268 clean.append(a) 269 270 # Maybe speeds things up? 271 clean.append('-fsyntax-only') 272 return clean 273 274 def parse_file(self, filename): 275 print(f"Parsing {filename}...") 276 filename = Path(filename) 277 cmds = self.cdb.getCompileCommands(filename) 278 if cmds is None or len(cmds) <= 0: 279 raise ArgumentError(f"{filename} not found in compilation database!") 280 281 # We only want the first one if multiple exist. 282 args = LogCompiler.clean_args(cmds[0].arguments) 283 284 index = ci.Index.create() 285 self.log_messages.extend(parse_file(str(filename), args, str(self.root_dir), index)) 286 287 def parse_all(self): 288 exclude_dirs = [ 289 # No debug.log messages live in these places! 290 self.root_dir / "src" / "bench", 291 self.root_dir / "src" / "test", 292 self.root_dir / "src" / "ipc" / "test", 293 self.root_dir / "src" / "qt" / "test", 294 self.root_dir / "src" / "wallet" / "test", 295 # autogenerated stuff 296 self.build_dir, 297 ] 298 299 exclude_dirs = [p.resolve() for p in exclude_dirs] 300 301 all_cmds = self.cdb.getAllCompileCommands() 302 if all_cmds is None: 303 raise AssertionError 304 305 seen: set[Path] = set() 306 tasks = [] 307 308 for cmd in all_cmds: 309 src_path = Path(cmd.filename).resolve() 310 311 if src_path in seen: 312 continue 313 314 seen.add(src_path) 315 if any(src_path.is_relative_to(d) for d in exclude_dirs): 316 continue 317 if src_path.suffix == ".cpp": 318 args = self.clean_args(cmd.arguments) 319 tasks.append((src_path, args, str(self.root_dir))) 320 321 print(f"Parsing {len(tasks)} files using {os.cpu_count()} threads...") 322 323 workers = os.cpu_count() 324 with ProcessPoolExecutor( 325 max_workers=workers, 326 initializer=worker_init 327 ) as executor: 328 # Submit all tasks 329 futures = { 330 executor.submit(worker_entrypoint, f, a, r): f 331 for f, a, r in tasks 332 } 333 334 # Watch for completion 335 for i, future in enumerate(as_completed(futures)): 336 try: 337 self.log_messages.extend(future.result()) 338 print(f"Progress: {i}/{len(tasks)}", end='\r') 339 except Exception as e: 340 print(f"\nTask failed: {e}") 341 traceback.print_exc() 342 343 def dump_db(self, out_file): 344 serialisable = [asdict(m) for m in self.log_messages] 345 346 with open(out_file, 'w') as f: 347 json.dump(serialisable, f, indent=2) 348 349 350 if __name__ == "__main__": 351 if len(sys.argv) not in [3, 4]: 352 print( 353 "Usage!!! logextractor.py {src_dir} {out_path}\n" 354 "Or!!! logextractor.py {src_dir} {file} {out_path}\n" 355 ) 356 sys.exit(-1) 357 358 compiler = LogCompiler(sys.argv[1]) 359 if len(sys.argv) == 3: 360 compiler.parse_all() 361 elif len(sys.argv) == 4: 362 compiler.parse_file(sys.argv[2]) 363 compiler.dump_db(sys.argv[-1])