file_utils.py
1 # Copyright (c) 2024-2026 Tencent Zhuque Lab. All rights reserved. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 # 15 # Requirement: Any integration or derivative work must explicitly attribute 16 # Tencent Zhuque Lab (https://github.com/Tencent/AI-Infra-Guard) in its 17 # documentation or user interface, as detailed in the NOTICE file. 18 19 """ 20 文件工具模块 - 提供文本/二进制判定、Diff 生成、目录树渲染及文件读写包装器 21 """ 22 import os 23 import difflib 24 from pathlib import Path 25 from typing import Optional, List, Tuple, Generator 26 from utils.path_utils import should_ignore_path, IGNORE_DIRECTORIES 27 28 29 # 常见二进制文件扩展名 30 BINARY_EXTENSIONS = { 31 '.zip', '.tar', '.gz', '.tgz', '.bz2', '.xz', '.7z', '.rar', 32 '.exe', '.dll', '.so', '.dylib', '.a', '.lib', '.o', '.obj', 33 '.class', '.jar', '.war', '.ear', 34 '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', 35 '.odt', '.ods', '.odp', 36 '.png', '.jpg', '.jpeg', '.gif', '.bmp', '.ico', '.webp', '.svg', 37 '.mp3', '.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', 38 '.wasm', '.pyc', '.pyo', 39 '.bin', '.dat', '.db', '.sqlite', '.sqlite3', 40 '.ttf', '.otf', '.woff', '.woff2', '.eot', 41 } 42 43 44 def is_binary_by_extension(path: str) -> bool: 45 """通过文件扩展名判断是否为二进制文件""" 46 ext = os.path.splitext(path)[1].lower() 47 return ext in BINARY_EXTENSIONS 48 49 50 def is_binary_by_content(path: str, check_bytes: int = 4096) -> bool: 51 """ 52 通过文件内容判断是否为二进制文件 53 54 Args: 55 path: 文件路径 56 check_bytes: 检查的字节数 57 58 Returns: 59 如果是二进制文件返回 True 60 """ 61 try: 62 with open(path, 'rb') as f: 63 chunk = f.read(check_bytes) 64 65 if not chunk: 66 return False 67 68 # 检查 NULL 字节 69 if b'\x00' in chunk: 70 return True 71 72 # 检查不可打印字符比例 73 non_printable = sum(1 for byte in chunk if byte < 9 or (13 < byte < 32)) 74 return non_printable / len(chunk) > 0.3 75 76 except Exception: 77 return True 78 79 80 def is_binary_file(path: str) -> bool: 81 """判断文件是否为二进制文件""" 82 if is_binary_by_extension(path): 83 return True 84 return is_binary_by_content(path) 85 86 87 def is_image_file(path: str) -> bool: 88 """判断是否为图片文件""" 89 ext = os.path.splitext(path)[1].lower() 90 return ext in {'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.ico'} 91 92 93 def is_pdf_file(path: str) -> bool: 94 """判断是否为 PDF 文件""" 95 return os.path.splitext(path)[1].lower() == '.pdf' 96 97 98 def generate_unified_diff( 99 old_content: str, 100 new_content: str, 101 filepath: str = "", 102 context_lines: int = 3 103 ) -> str: 104 """ 105 生成统一格式的 Diff 106 107 Args: 108 old_content: 原始内容 109 new_content: 新内容 110 filepath: 文件路径 111 context_lines: 上下文行数 112 113 Returns: 114 Diff 字符串 115 """ 116 old_lines = old_content.splitlines(keepends=True) 117 new_lines = new_content.splitlines(keepends=True) 118 119 diff = difflib.unified_diff( 120 old_lines, 121 new_lines, 122 fromfile=f"a/{filepath}" if filepath else "a/file", 123 tofile=f"b/{filepath}" if filepath else "b/file", 124 n=context_lines 125 ) 126 127 return ''.join(diff) 128 129 130 def trim_diff_indentation(diff: str) -> str: 131 """ 132 裁剪 Diff 的缩进,使输出更紧凑 133 """ 134 lines = diff.split('\n') 135 content_lines = [ 136 line for line in lines 137 if (line.startswith('+') or line.startswith('-') or line.startswith(' ')) 138 and not line.startswith('---') and not line.startswith('+++') 139 ] 140 141 if not content_lines: 142 return diff 143 144 # 计算最小缩进 145 min_indent = float('inf') 146 for line in content_lines: 147 content = line[1:] # 跳过第一个字符 (+/-/空格) 148 if content.strip(): 149 stripped = len(content) - len(content.lstrip()) 150 min_indent = min(min_indent, stripped) 151 152 if min_indent == float('inf') or min_indent == 0: 153 return diff 154 155 # 裁剪缩进 156 result_lines = [] 157 for line in lines: 158 if (line.startswith('+') or line.startswith('-') or line.startswith(' ')) \ 159 and not line.startswith('---') and not line.startswith('+++'): 160 prefix = line[0] 161 content = line[1:] 162 result_lines.append(prefix + content[min_indent:]) 163 else: 164 result_lines.append(line) 165 166 return '\n'.join(result_lines) 167 168 169 def render_directory_tree( 170 root_path: str, 171 max_depth: int = -1, 172 max_files: int = 100, 173 ignore_patterns: Optional[set] = None 174 ) -> Tuple[str, int, bool]: 175 """ 176 渲染目录树 177 178 Args: 179 root_path: 根目录路径 180 max_depth: 最大深度 (-1 表示无限) 181 max_files: 最大文件数 182 ignore_patterns: 额外忽略的模式 183 184 Returns: 185 (树形字符串, 文件计数, 是否被截断) 186 """ 187 ignore = IGNORE_DIRECTORIES.copy() 188 if ignore_patterns: 189 ignore.update(ignore_patterns) 190 191 file_count = 0 192 truncated = False 193 lines = [f"{os.path.basename(root_path) or root_path}/"] 194 195 def walk(path: str, prefix: str = "", depth: int = 0): 196 nonlocal file_count, truncated 197 198 if max_depth >= 0 and depth > max_depth: 199 return 200 201 if file_count >= max_files: 202 truncated = True 203 return 204 205 try: 206 entries = sorted(os.listdir(path)) 207 except PermissionError: 208 return 209 210 # 分离目录和文件 211 dirs = [] 212 files = [] 213 for entry in entries: 214 if entry in ignore or entry.startswith('.'): 215 continue 216 full_path = os.path.join(path, entry) 217 if os.path.isdir(full_path): 218 dirs.append(entry) 219 else: 220 files.append(entry) 221 222 all_entries = dirs + files 223 224 for i, entry in enumerate(all_entries): 225 if file_count >= max_files: 226 truncated = True 227 break 228 229 is_last = (i == len(all_entries) - 1) 230 connector = "└── " if is_last else "├── " 231 full_path = os.path.join(path, entry) 232 233 if os.path.isdir(full_path): 234 lines.append(f"{prefix}{connector}{entry}/") 235 extension = " " if is_last else "│ " 236 walk(full_path, prefix + extension, depth + 1) 237 else: 238 lines.append(f"{prefix}{connector}{entry}") 239 file_count += 1 240 241 walk(root_path) 242 243 return '\n'.join(lines), file_count, truncated 244 245 246 def read_file_with_lines( 247 path: str, 248 offset: int = 0, 249 limit: int = 2000, 250 max_bytes: int = 50 * 1024, 251 max_line_length: int = 2000 252 ) -> Tuple[List[str], int, bool, bool]: 253 """ 254 读取文件内容,带行号 255 256 Args: 257 path: 文件路径 258 offset: 起始行 (0-based) 259 limit: 读取行数 260 max_bytes: 最大字节数 261 max_line_length: 最大行长度 262 263 Returns: 264 (带行号的行列表, 总行数, 是否有更多行, 是否因字节限制截断) 265 """ 266 with open(path, 'r', encoding='utf-8') as f: 267 all_lines = f.readlines() 268 269 total_lines = len(all_lines) 270 result = [] 271 bytes_count = 0 272 truncated_by_bytes = False 273 274 for i in range(offset, min(len(all_lines), offset + limit)): 275 line = all_lines[i].rstrip('\n\r') 276 277 # 截断过长的行 278 if len(line) > max_line_length: 279 line = line[:max_line_length] + "..." 280 281 line_size = len(line.encode('utf-8')) + 1 282 283 if bytes_count + line_size > max_bytes: 284 truncated_by_bytes = True 285 break 286 287 # 格式化行号 288 line_num = str(i + 1).zfill(5) 289 result.append(f"{line_num}| {line}") 290 bytes_count += line_size 291 292 has_more = (offset + len(result)) < total_lines 293 294 return result, total_lines, has_more, truncated_by_bytes 295 296 297 def safe_write_file(path: str, content: str, create_dirs: bool = True) -> None: 298 """ 299 安全写入文件 300 301 Args: 302 path: 文件路径 303 content: 文件内容 304 create_dirs: 是否创建父目录 305 """ 306 if create_dirs: 307 parent = os.path.dirname(path) 308 if parent and not os.path.exists(parent): 309 os.makedirs(parent, exist_ok=True) 310 311 with open(path, 'w', encoding='utf-8') as f: 312 f.write(content) 313 314 315 def safe_read_file(path: str) -> Tuple[Optional[str], Optional[str]]: 316 """ 317 安全读取文件 318 319 Returns: 320 (content, error_message) 321 """ 322 try: 323 with open(path, 'r', encoding='utf-8') as f: 324 return f.read(), None 325 except FileNotFoundError: 326 return None, f"File not found: {path}" 327 except PermissionError: 328 return None, f"Permission denied: {path}" 329 except UnicodeDecodeError: 330 return None, f"Failed to decode file (binary?): {path}" 331 except Exception as e: 332 return None, f"Error reading file: {str(e)}" 333