exporter_xlsx.py
1 import os 2 import time 3 import traceback 4 5 from wxManager import Me, MessageType 6 from wxManager.decrypt.decrypt_dat import batch_decode_image_multiprocessing 7 from wxManager.log import logger 8 from wxManager.model import Message 9 from exporter.exporter import ExporterBase, copy_files, decode_audios, get_new_filename 10 11 from PIL import JpegImagePlugin 12 from PIL import ImageFile 13 14 from PIL import Image as PILImage 15 16 from wxManager.parser.link_parser import wx_sport, wx_collection_data, wx_pay_data 17 18 JpegImagePlugin._getmp = lambda x: None 19 ImageFile.LOAD_TRUNCATED_IMAGES = True 20 21 22 def add_hyperlink(doc, row, column, hyperlink): 23 from openpyxl.styles import Font 24 import openpyxl 25 from openpyxl.drawing.image import Image 26 from openpyxl.utils import get_column_letter 27 Image.MAX_IMAGE_PIXELS = None 28 cell = doc.cell(row=row, column=column) 29 cell.hyperlink = hyperlink 30 # 添加样式来改变超链接文本的颜色和下划线 31 font = Font(color="0000FF", underline="single") # 蓝色和单下划线 32 cell.font = font 33 34 35 def find_image_with_known_extensions(img_path): 36 # 常见的图片后缀名 37 extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'] 38 directory = os.path.dirname(img_path) 39 filename = os.path.basename(img_path) 40 41 for ext in extensions: 42 # 构造完整路径 43 full_path = os.path.join(directory, f"{filename}{ext}") 44 # 检查文件是否存在 45 if os.path.isfile(full_path): 46 return full_path 47 48 return None 49 50 51 class ExcelExporter(ExporterBase): 52 row = 2 53 54 def add_member_info(self, sheet): 55 if self.contact.is_chatroom(): 56 columns = ['wxid', '微信号', '类型', '群昵称', '昵称', '头像地址', 57 '头像原图', '标签', '性别', '个性签名', '国家(地区)', '省份', '城市'] 58 self.group_contacts = self.database.get_chatroom_members(self.contact.wxid) 59 # 写入CSV文件 60 sheet.append(columns) 61 for wxid, contact in self.group_contacts.items(): 62 sheet.append( 63 [ 64 contact.wxid, contact.alias, contact.flag, contact.remark, contact.nickname, 65 contact.small_head_img_url, contact.big_head_img_url, contact.label_name(), 66 contact.gender, contact.signature, *contact.region 67 ] 68 ) 69 else: 70 if self.contact.is_public(): 71 pass 72 else: 73 columns = ( 74 'wxid', '微信号', '类型', '群昵称', '昵称', '头像地址', '头像原图', '标签', '性别', '电话', 75 '个性签名', '国家(地区)', '省份', '城市') 76 # 写入CSV文件 77 sheet.append(columns) 78 contact = self.contact 79 sheet.append( 80 [ 81 contact.wxid, contact.alias, contact.flag, contact.remark, contact.nickname, 82 contact.small_head_img_url, contact.big_head_img_url, contact.label_name(), 83 contact.gender, contact.signature, *contact.region 84 ] 85 ) 86 87 def message_to_list(self, message: Message): 88 remark = message.display_name 89 nickname = message.display_name 90 if self.contact.is_chatroom(): 91 contact = self.group_contacts.get(message.sender_id) 92 if contact: 93 remark = contact.remark 94 nickname = contact.nickname 95 else: 96 contact = Me() if message.is_sender else self.contact 97 remark = contact.remark 98 nickname = contact.nickname 99 res = [str(message.server_id), message.type_name(), message.display_name, message.str_time, message.to_text(), 100 remark, nickname, 'more'] 101 return res 102 103 def to_excel(self): 104 from openpyxl.styles import Font 105 import openpyxl 106 from openpyxl.drawing.image import Image 107 from openpyxl.utils import get_column_letter 108 Image.MAX_IMAGE_PIXELS = None 109 print(f"【开始导出 XLSX {self.contact.remark}】") 110 os.makedirs(self.origin_path, exist_ok=True) 111 filename = os.path.join(self.origin_path, f"{self.contact.remark}.xlsx") 112 filename = get_new_filename(filename) 113 columns = ['消息ID', '类型', '发送人', '时间', '内容', '备注', '昵称', '更多信息'] 114 messages = self.database.get_messages(self.contact.wxid, time_range=self.time_range) 115 new_workbook = openpyxl.Workbook() 116 new_sheet = new_workbook.create_sheet("聊天记录", 0) 117 member_sheet = new_workbook.create_sheet("成员信息", 1) 118 self.add_member_info(member_sheet) 119 new_sheet.append(columns) 120 num = 1 121 total_num = len(messages) 122 image_tasks = [] 123 video_tasks = [] 124 file_tasks = [] 125 audio_tasks = [] 126 image_dir = os.path.join(self.origin_path, 'image') 127 video_dir = os.path.join(self.origin_path, 'video') 128 audio_dir = os.path.join(self.origin_path, 'voice') 129 file_dir = os.path.join(self.origin_path, 'file') 130 image_index = {} 131 132 def parser_merged(merged_message): 133 for msg in merged_message.messages: 134 type_ = msg.type 135 if type_ == MessageType.Image: 136 msg.set_file_name() 137 image_tasks.append( 138 ( 139 os.path.join(Me().wx_dir, msg.path), 140 os.path.join(image_dir, msg.str_time[:7]), 141 msg.file_name 142 ) 143 ) 144 image_tasks.append( 145 ( 146 os.path.join(Me().wx_dir, msg.thumb_path), 147 os.path.join(image_dir, msg.str_time[:7]), 148 msg.file_name + '_t' 149 ) 150 ) 151 msg.path = f"./image/{msg.str_time[:7]}/{msg.file_name}" 152 msg.thumb_path = f"./image/{msg.str_time[:7]}/{msg.file_name + '_t'}" 153 elif type_ == MessageType.File: 154 origin_file_path = os.path.join(Me().wx_dir, msg.path) 155 file_tasks.append( 156 ( 157 origin_file_path, 158 os.path.join(file_dir, msg.str_time[:7]), 159 '' 160 ) 161 ) 162 msg.path = f'./file/{msg.str_time[:7]}/{os.path.basename(origin_file_path)}' 163 elif type_ == MessageType.Video: 164 msg.set_file_name() 165 video_tasks.append( 166 ( 167 os.path.join(Me().wx_dir, msg.path), 168 os.path.join(video_dir, msg.str_time[:7]), 169 msg.file_name 170 ) 171 ) 172 ext = os.path.basename(msg.path).split('.')[-1] 173 msg.path = f'./video/{msg.str_time[:7]}/{msg.file_name}.{ext}' 174 elif type_ == MessageType.MergedMessages: 175 parser_merged(msg) 176 177 for index, message in enumerate(messages): 178 if not self._is_running: 179 break 180 if index % 1000 == 0: 181 self.update_progress_callback(index / total_num) 182 if not self.is_selected(message): 183 continue 184 try: 185 new_sheet.append(self.message_to_list(message)) 186 self.row += 1 187 except: 188 logger.error(traceback.format_exc()) 189 continue 190 type_ = message.type 191 if type_ == MessageType.Image: 192 message.set_file_name() 193 image_index[message.server_id] = self.row 194 image_tasks.append( 195 ( 196 os.path.join(Me().wx_dir, message.path), 197 os.path.join(image_dir, message.str_time[:7]), 198 message.file_name 199 ) 200 ) 201 image_tasks.append( 202 ( 203 os.path.join(Me().wx_dir, message.thumb_path), 204 os.path.join(image_dir, message.str_time[:7]), 205 message.file_name + '_t' 206 ) 207 ) 208 message.path = f"./image/{message.str_time[:7]}/{message.file_name}" 209 message.thumb_path = f"./image/{message.str_time[:7]}/{message.file_name + '_t'}" 210 elif type_ == MessageType.File: 211 origin_file_path = os.path.join(Me().wx_dir, message.path) 212 file_tasks.append( 213 ( 214 origin_file_path, 215 os.path.join(file_dir, message.str_time[:7]), 216 '' 217 ) 218 ) 219 if os.path.isfile(origin_file_path): 220 message.path = f'./file/{message.str_time[:7]}/{os.path.basename(origin_file_path)}' 221 add_hyperlink(new_sheet, self.row, 5, message.path) 222 elif type_ == MessageType.Video: 223 message.set_file_name() 224 video_tasks.append( 225 ( 226 os.path.join(Me().wx_dir, message.path), 227 os.path.join(video_dir, message.str_time[:7]), 228 message.file_name 229 ) 230 ) 231 ext = os.path.basename(message.path).split('.')[-1] 232 message.path = f'./video/{message.str_time[:7]}/{message.file_name}.{ext}' 233 add_hyperlink(new_sheet, self.row, 5, message.path) 234 elif type_ == MessageType.Audio: 235 message.set_file_name() 236 audio_tasks.append( 237 ( 238 self.database.get_media_buffer(message.server_id), 239 os.path.join(audio_dir, message.str_time[:7]), 240 message.file_name 241 ) 242 ) 243 message.path = f'./voice/{message.str_time[:7]}/{message.file_name + ".mp3"}' 244 add_hyperlink(new_sheet, self.row, 5, message.path) 245 elif type_ == MessageType.MergedMessages: 246 parser_merged(message) 247 # 使用多进程,导出所有图片 248 batch_decode_image_multiprocessing(Me().xor_key, image_tasks) 249 250 # 使用多线程,复制文件、视频到导出文件夹 251 copy_files(video_tasks + file_tasks) 252 253 decode_audios(audio_tasks) 254 if MessageType.Image in self.message_types: 255 for index, message in enumerate(messages): 256 if message.type == MessageType.Image: 257 if not self.is_selected(message): 258 continue 259 row = image_index[message.server_id] 260 img_path = find_image_with_known_extensions(os.path.join(self.origin_path, message.path)) 261 if not img_path: 262 img_path = find_image_with_known_extensions(os.path.join(self.origin_path, message.thumb_path)) 263 if not img_path: 264 continue 265 try: 266 # 打开图片以获取其尺寸 267 with PILImage.open(img_path) as img: 268 width, height = img.size 269 max_height = 500 270 # 计算缩放比例 271 scale = min(1.0, max_height / height) 272 273 # 缩放后的图片尺寸 274 scaled_width = int(width * scale) 275 scaled_height = int(height * scale) 276 277 # 插入图片 278 img = Image(img_path) 279 img.width = scaled_width 280 img.height = scaled_height 281 282 # 计算单元格的坐标 283 cell = f"{get_column_letter(5)}{row}" 284 285 # 将图片添加到工作表 286 new_sheet.add_image(img, cell) 287 288 # 设置行高 289 new_sheet.row_dimensions[row].height = scaled_height * 0.75 # 0.75 是像素到 Excel 单位的转换因子 290 except: 291 logger.error(traceback.format_exc()) 292 pass 293 # 获取列的字母表示(A、B、C...) 294 col_letter = get_column_letter(1) 295 # 设置整列的单元格格式为文本 296 for cell in new_sheet[col_letter]: 297 cell.number_format = "@" # "@" 表示文本格式 298 try: 299 new_workbook.save(filename) 300 except PermissionError: 301 filename = '.'.join(filename.split('.')[:-1]) + str(int(time.time())) + '.xlsx' 302 new_workbook.save(filename) 303 self.update_progress_callback(1) 304 self.finish_callback(self.exporter_id) 305 print(f"【完成导出 XLSX {self.contact.remark}】") 306 307 def public_to_excel(self): 308 from openpyxl.styles import Font 309 import openpyxl 310 from openpyxl.drawing.image import Image 311 from openpyxl.utils import get_column_letter 312 Image.MAX_IMAGE_PIXELS = None 313 314 print(f"【开始导出 XLSX {self.contact.remark}】") 315 os.makedirs(self.origin_path, exist_ok=True) 316 filename = os.path.join(self.origin_path, f"{self.contact.remark}.xlsx") 317 filename = get_new_filename(filename) 318 columns = ['日期', '时间', '标题', '描述', '链接', '更多信息'] 319 messages = self.database.get_messages(self.contact.wxid, time_range=self.time_range) 320 new_workbook = openpyxl.Workbook() 321 new_sheet = new_workbook.create_sheet("聊天记录", 0) 322 new_sheet.append(columns) 323 total_num = len(messages) 324 for index, message in enumerate(messages): 325 if not self._is_running: 326 break 327 if index % 1000 == 0: 328 self.update_progress_callback(index / total_num) 329 if not message.type in {MessageType.LinkMessage}: 330 continue 331 try: 332 new_sheet.append([*message.str_time.split(' '), message.title, message.description, message.href]) 333 except: 334 logger.error(traceback.format_exc()) 335 continue 336 # 获取列的字母表示(A、B、C...) 337 col_letter = get_column_letter(1) 338 # 设置整列的单元格格式为文本 339 for cell in new_sheet[col_letter]: 340 cell.number_format = "@" # "@" 表示文本格式 341 try: 342 new_workbook.save(filename) 343 except PermissionError: 344 filename = '.'.join(filename.split('.')[:-1]) + str(int(time.time())) + '.xlsx' 345 new_workbook.save(filename) 346 self.update_progress_callback(1) 347 self.finish_callback(self.exporter_id) 348 print(f"【完成导出 XLSX {self.contact.remark}】") 349 350 def wx_pay(self): 351 from openpyxl.styles import Font 352 import openpyxl 353 from openpyxl.drawing.image import Image 354 from openpyxl.utils import get_column_letter 355 Image.MAX_IMAGE_PIXELS = None 356 print(f"【开始导出 XLSX {self.contact.remark}】") 357 os.makedirs(self.origin_path, exist_ok=True) 358 filename = os.path.join(self.origin_path, f"{self.contact.remark}.xlsx") 359 filename = get_new_filename(filename) 360 columns = ['类型', '收款单位', '日期', '时间', '金额', '付款方式', '收单机构', '更多信息'] 361 messages = self.database.get_messages(self.contact.wxid, time_range=self.time_range) 362 new_workbook = openpyxl.Workbook() 363 new_sheet = new_workbook.create_sheet("聊天记录", 0) 364 new_sheet.append(columns) 365 total_num = len(messages) 366 for index, message in enumerate(messages): 367 if not self._is_running: 368 break 369 if index % 1000 == 0: 370 self.update_progress_callback(index / total_num) 371 if not message.type in {MessageType.LinkMessage}: 372 continue 373 try: 374 card_data = wx_pay_data(message.xml_content) 375 date, str_time = message.str_time.split(' ') 376 if card_data.get('title') in {'记账日报', '「先享后付」服务使用通知', '转入零钱通,五一享收益', 377 '转入零钱通,端午享收益', '智能手表支付服务已启用', '优惠券领取提醒', 378 '清明假期收益规则', '「先享后付」服务完成通知', '礼包领取提醒', 379 '五一假期收益规则提醒', '端午节假期收益规则', '中秋节假期收益规则', 380 '元旦假期收益规则', '春节假期收益规则', '五一假期收益规则', 381 '中秋及国庆假期收益规则', '春节赚收益攻略', '「先享后付」服务取消通知', 382 '揭开骗局,远离诈骗'}: 383 continue 384 new_sheet.append( 385 [ 386 card_data.get('title'), card_data.get('display_name'), date, str_time, 387 card_data.get('money'), card_data.get('payment_type'), card_data.get('acquiring_institution'), 388 card_data.get('more') 389 ] 390 ) 391 except: 392 logger.error(traceback.format_exc()) 393 continue 394 # 获取列的字母表示(A、B、C...) 395 col_letter = get_column_letter(1) 396 # 设置整列的单元格格式为文本 397 for cell in new_sheet[col_letter]: 398 cell.number_format = "@" # "@" 表示文本格式 399 try: 400 new_workbook.save(filename) 401 except PermissionError: 402 filename = '.'.join(filename.split('.')[:-1]) + str(int(time.time())) + '.xlsx' 403 new_workbook.save(filename) 404 self.update_progress_callback(1) 405 self.finish_callback(self.exporter_id) 406 print(f"【完成导出 XLSX {self.contact.remark}】") 407 408 def wx_collect(self): 409 from openpyxl.styles import Font 410 import openpyxl 411 from openpyxl.drawing.image import Image 412 from openpyxl.utils import get_column_letter 413 Image.MAX_IMAGE_PIXELS = None 414 415 print(f"【开始导出 XLSX {self.contact.remark}】") 416 os.makedirs(self.origin_path, exist_ok=True) 417 filename = os.path.join(self.origin_path, f"{self.contact.remark}.xlsx") 418 filename = get_new_filename(filename) 419 columns = ['类型', '日期', '时间', '金额', '详细信息', '汇总', '备注', '更多信息'] 420 messages = self.database.get_messages(self.contact.wxid, time_range=self.time_range) 421 new_workbook = openpyxl.Workbook() 422 new_sheet = new_workbook.create_sheet("聊天记录", 0) 423 new_sheet.append(columns) 424 total_num = len(messages) 425 for index, message in enumerate(messages): 426 if not self._is_running: 427 break 428 if index % 1000 == 0: 429 self.update_progress_callback(index / total_num) 430 if not message.type in {MessageType.LinkMessage}: 431 continue 432 try: 433 card_data = wx_collection_data(message.xml_content) 434 date, str_time = message.str_time.split(' ') 435 new_sheet.append( 436 [ 437 card_data.get('title'), date, str_time, card_data.get('money'), card_data.get('display_name'), 438 card_data.get('summary'), card_data.get('more') 439 ] 440 ) 441 except: 442 logger.error(traceback.format_exc()) 443 continue 444 # 获取列的字母表示(A、B、C...) 445 col_letter = get_column_letter(1) 446 # 设置整列的单元格格式为文本 447 for cell in new_sheet[col_letter]: 448 cell.number_format = "@" # "@" 表示文本格式 449 try: 450 new_workbook.save(filename) 451 except PermissionError: 452 filename = '.'.join(filename.split('.')[:-1]) + str(int(time.time())) + '.xlsx' 453 new_workbook.save(filename) 454 self.update_progress_callback(1) 455 self.finish_callback(self.exporter_id) 456 print(f"【完成导出 XLSX {self.contact.remark}】") 457 458 def wx_sport(self): 459 from openpyxl.styles import Font 460 import openpyxl 461 from openpyxl.drawing.image import Image 462 from openpyxl.utils import get_column_letter 463 Image.MAX_IMAGE_PIXELS = None 464 465 466 print(f"【开始导出 XLSX {self.contact.remark}】") 467 os.makedirs(self.origin_path, exist_ok=True) 468 filename = os.path.join(self.origin_path, f"{self.contact.remark}.xlsx") 469 filename = get_new_filename(filename) 470 columns = ['日期', '排名', '步数', '当日冠军', '当日冠军步数', '更多信息'] 471 messages = self.database.get_messages(self.contact.wxid, time_range=self.time_range) 472 new_workbook = openpyxl.Workbook() 473 new_sheet = new_workbook.create_sheet("聊天记录", 0) 474 new_sheet.append(columns) 475 total_num = len(messages) 476 for index, message in enumerate(messages): 477 if not self._is_running: 478 break 479 if index and index % 1000 == 0: 480 self.update_progress_callback(index / total_num) 481 if not message.type in {MessageType.LinkMessage}: 482 continue 483 try: 484 card_data = wx_sport(message.xml_content) 485 champion_name = '' 486 if not card_data.get('rank_list'): 487 champion = {} 488 else: 489 champion = card_data.get('rank_list')[0] 490 contact = self.database.get_contact_by_username(champion.get('username')) 491 champion_name = contact.remark 492 new_sheet.append( 493 [ 494 message.str_time.split(' ')[0], card_data.get('rank'), card_data.get('score'), 495 champion_name, champion.get('score') 496 ] 497 ) 498 except: 499 logger.error(traceback.format_exc()) 500 continue 501 # 获取列的字母表示(A、B、C...) 502 col_letter = get_column_letter(1) 503 # 设置整列的单元格格式为文本 504 for cell in new_sheet[col_letter]: 505 cell.number_format = "@" # "@" 表示文本格式 506 try: 507 new_workbook.save(filename) 508 except PermissionError: 509 filename = '.'.join(filename.split('.')[:-1]) + str(int(time.time())) + '.xlsx' 510 new_workbook.save(filename) 511 self.update_progress_callback(1) 512 self.finish_callback(self.exporter_id) 513 print(f"【完成导出 XLSX {self.contact.remark}】") 514 515 def run(self): 516 if self.contact.is_public(): 517 if self.contact.wxid == 'gh_3dfda90e39d6': 518 self.wx_pay() 519 elif self.contact.wxid == 'gh_f0a92aa7146c': 520 self.wx_collect() 521 elif self.contact.wxid == 'gh_43f2581f6fd6': 522 self.wx_sport() 523 else: 524 self.public_to_excel() 525 else: 526 self.to_excel()