/ maigret_mods / command_builder.py
command_builder.py
1 """ 2 Command builder for Blackbird OSINT tool 3 """ 4 from .filter_parser import parse_filters_from_file, parse_filters_from_string, validate_filters 5 import os 6 7 def combine_filters(filter_list): 8 """Combine multiple filters with smart AND/OR operator selection""" 9 if not filter_list: 10 return "" 11 12 # Process each filter 13 processed_filters = [] 14 15 for filter_item in filter_list: 16 if filter_item.startswith("file:"): 17 file_path = filter_item[5:] 18 if os.path.exists(file_path): 19 try: 20 # Use the new parser to handle comments 21 parsed_filters = parse_filters_from_file(file_path) 22 processed_filters.extend(parsed_filters) 23 except Exception as e: 24 print(f"❌ Error parsing filter file {file_path}: {e}") 25 # If can't parse, use the file reference as-is 26 processed_filters.append(filter_item) 27 else: 28 processed_filters.append(filter_item) 29 else: 30 # Parse direct filter text for comments 31 parsed_filters = parse_filters_from_string(filter_item) 32 processed_filters.extend(parsed_filters) 33 34 if len(processed_filters) == 1: 35 return processed_filters[0] 36 37 # Group filters by type for smart combination 38 name_contains_or_equals = [] 39 name_not_equals = [] 40 cat_not_equals = [] 41 cat_equals = [] 42 other_filters = [] 43 44 for filter_expr in processed_filters: 45 if 'name~' in filter_expr or 'name=' in filter_expr: 46 name_contains_or_equals.append(filter_expr) 47 elif 'name!=' in filter_expr: 48 name_not_equals.append(filter_expr) 49 elif 'cat!=' in filter_expr: 50 cat_not_equals.append(filter_expr) 51 elif 'cat=' in filter_expr: 52 cat_equals.append(filter_expr) 53 else: 54 other_filters.append(filter_expr) 55 56 # Build combined filter 57 combined_parts = [] 58 59 # 1. Combine name~ and name= with OR 60 if name_contains_or_equals: 61 if len(name_contains_or_equals) == 1: 62 combined_parts.append(name_contains_or_equals[0]) 63 else: 64 combined_parts.append(" or ".join(name_contains_or_equals)) 65 66 # 2. Combine name!= with AND 67 if name_not_equals: 68 if len(name_not_equals) == 1: 69 combined_parts.append(name_not_equals[0]) 70 else: 71 combined_parts.append(" and ".join(name_not_equals)) 72 73 # 3. Combine cat!= with AND 74 if cat_not_equals: 75 if len(cat_not_equals) == 1: 76 combined_parts.append(cat_not_equals[0]) 77 else: 78 combined_parts.append(" and ".join(cat_not_equals)) 79 80 # 4. Combine cat= with OR 81 if cat_equals: 82 if len(cat_equals) == 1: 83 combined_parts.append(cat_equals[0]) 84 else: 85 combined_parts.append(" or ".join(cat_equals)) 86 87 # 5. Add other filters 88 combined_parts.extend(other_filters) 89 90 # Combine all parts with AND 91 if len(combined_parts) == 1: 92 return combined_parts[0] 93 else: 94 return " and ".join(combined_parts) 95 96 def process_filters_with_comments(filter_input, show_validation=False): 97 """ 98 Process filter input with comment support 99 100 Args: 101 filter_input (str): Filter input string 102 show_validation (bool): Whether to show validation results 103 104 Returns: 105 tuple: (valid_filters, invalid_filters, combined_filter) 106 """ 107 parsed_filters = parse_filters_from_string(filter_input) 108 valid_filters, invalid_filters = validate_filters(parsed_filters) 109 110 if show_validation: 111 if valid_filters: 112 print(f"✅ Valid filters found: {len(valid_filters)}") 113 for i, filt in enumerate(valid_filters, 1): 114 print(f" {i}. {filt}") 115 116 if invalid_filters: 117 print(f"⚠️ Invalid filters found: {len(invalid_filters)}") 118 for i, filt in enumerate(invalid_filters, 1): 119 print(f" {i}. {filt}") 120 121 combined_filter = combine_filters(valid_filters) if valid_filters else "" 122 123 return valid_filters, invalid_filters, combined_filter 124 125 126 def auto_detect_and_append_operators(filter_text): 127 """Automatically append the right operators to filter text""" 128 lines = [line.strip() for line in filter_text.split('\n') if line.strip()] 129 130 if len(lines) <= 1: 131 return filter_text 132 133 # Analyze each line to determine operator 134 result_lines = [] 135 for i, line in enumerate(lines): 136 result_lines.append(line) 137 138 # Only add operator if not the last line 139 if i < len(lines) - 1: 140 current_line = line 141 next_line = lines[i + 1] 142 143 # Determine operator based on patterns (use lowercase) 144 if 'name~' in current_line or 'name=' in current_line: 145 if 'name~' in next_line or 'name=' in next_line: 146 result_lines.append("or") # lowercase 147 else: 148 result_lines.append("and") # lowercase 149 elif 'name!=' in current_line: 150 result_lines.append("and") # lowercase 151 elif 'cat!=' in current_line: 152 result_lines.append("and") # lowercase 153 elif 'cat=' in current_line: 154 result_lines.append("or") # lowercase 155 else: 156 result_lines.append("and") # lowercase 157 158 return " ".join(result_lines) 159 160 161 def auto_detect_and_append_operators(filter_text): 162 """Automatically append the right operators to filter text""" 163 lines = [line.strip() for line in filter_text.split('\n') if line.strip()] 164 165 if len(lines) <= 1: 166 return filter_text 167 168 # Analyze each line to determine operator 169 result_lines = [] 170 for i, line in enumerate(lines): 171 result_lines.append(line) 172 173 # Only add operator if not the last line 174 if i < len(lines) - 1: 175 current_line = line 176 next_line = lines[i + 1] 177 178 # Determine operator based on patterns 179 if 'name~' in current_line or 'name=' in current_line: 180 if 'name~' in next_line or 'name=' in next_line: 181 result_lines.append("or") 182 else: 183 result_lines.append("and") 184 elif 'name!=' in current_line: 185 result_lines.append("and") 186 elif 'cat!=' in current_line: 187 result_lines.append("and") # cat!= uses AND 188 elif 'cat=' in current_line: 189 result_lines.append("or") # cat= uses OR 190 else: 191 result_lines.append("and") 192 193 return " ".join(result_lines) 194 195 # Update the build_blackbird_command function in command_builder.py 196 # In command_builder.py, update the build_blackbird_command function: 197 def build_blackbird_command( 198 username_input="", 199 email_input="", 200 username_file_input="", 201 email_file_input="", 202 permute_checkbox=False, 203 permuteall_checkbox=False, 204 AI_checkbox=False, 205 no_nsfw_checkbox=False, 206 no_update_checkbox=False, 207 csv_checkbox=False, 208 pdf_checkbox=False, 209 json_checkbox=False, 210 verbose_checkbox=False, 211 dump_checkbox=False, 212 proxy_input="", 213 timeout_spinbox=30, 214 max_concurrent_requests=30, 215 filter_input="", 216 interactive_filters=None, # Now always used 217 use_interactive_filters=True, # Always true now, but keep for compatibility 218 instagram_session_id="" 219 ): 220 """ 221 Build Blackbird command based on input parameters 222 223 Args: 224 username_input (str): Username(s) to search 225 email_input (str): Email(s) to search 226 username_file_input (str): Path to username file 227 email_file_input (str): Path to email file 228 permute_checkbox (bool): Enable username permutation 229 permuteall_checkbox (bool): Enable all permutations 230 AI_checkbox (bool): Enable AI analysis 231 no_nsfw_checkbox (bool): Exclude NSFW sites 232 no_update_checkbox (bool): Disable updates 233 csv_checkbox (bool): Generate CSV output 234 pdf_checkbox (bool): Generate PDF output 235 json_checkbox (bool): Generate JSON output 236 verbose_checkbox (bool): Enable verbose output 237 dump_checkbox (bool): Dump HTML output 238 proxy_input (str): Proxy URL 239 timeout_spinbox (int): Timeout in seconds 240 max_concurrent_requests (int): Max concurrent requests 241 filter_input (str): Original filter input (deprecated) 242 interactive_filters (list): List of interactive filter strings 243 use_interactive_filters (bool): Whether to use interactive filters 244 instagram_session_id (str): Instagram session ID 245 246 Returns: 247 list: Command as list of arguments or None if error 248 """ 249 250 command = ["python", "blackbird.py"] 251 252 # Add username or username file 253 if username_input: 254 if username_input.startswith("file:"): 255 file_path = username_input[5:] 256 if os.path.exists(file_path): 257 try: 258 # Use the parser to clean the file 259 from .filter_parser import parse_usernames_from_file 260 usernames = parse_usernames_from_file(file_path) 261 if usernames: 262 # Create a temporary file with cleaned usernames 263 import tempfile 264 with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', encoding='utf-8') as tmp: 265 tmp.write('\n'.join(usernames)) 266 temp_file_path = tmp.name 267 command.extend(["-uf", temp_file_path]) 268 print(f"✅ Loaded {len(usernames)} cleaned usernames from {file_path}") 269 else: 270 print(f"⚠️ No valid usernames found in file: {file_path}") 271 return None 272 except Exception as e: 273 print(f"❌ Error parsing username file {file_path}: {e}") 274 # Fallback to original file 275 command.extend(["-uf", file_path]) 276 else: 277 print(f"❌ Username file not found: {file_path}") 278 return None 279 else: 280 # Handle multiple usernames separated by commas 281 usernames = [u.strip() for u in username_input.split(',') if u.strip()] 282 if usernames: 283 for username in usernames: 284 username_parts = username.split() 285 command.append("-u") 286 command.extend(username_parts) 287 288 # Add email or email file 289 if email_input: 290 if email_input.startswith("file:"): 291 file_path = email_input[5:] 292 if os.path.exists(file_path): 293 command.extend(["-ef", file_path]) 294 else: 295 print(f"❌ Email file not found: {file_path}") 296 return None 297 else: 298 # Handle multiple emails separated by commas 299 emails = [e.strip() for e in email_input.split(',') if e.strip()] 300 if emails: 301 command.extend(["-e"] + emails) 302 303 # Handle filters - always use interactive filters now (if provided) 304 filter_string = "" 305 306 if interactive_filters: 307 # Process all interactive filters 308 all_filter_parts = [] 309 310 for filter_item in interactive_filters: 311 if filter_item.startswith("file:"): 312 file_path = filter_item[5:] 313 if os.path.exists(file_path): 314 try: 315 with open(file_path, 'r') as f: 316 content = f.read().strip() 317 if content: 318 lines = [line.strip() for line in content.split('\n') if line.strip()] 319 all_filter_parts.extend(lines) 320 except Exception as e: 321 print(f"❌ Error reading filter file {file_path}: {e}") 322 # If can't read, skip this filter 323 continue 324 else: 325 print(f"❌ Filter file not found: {file_path}") 326 continue 327 else: 328 # Direct filter expression 329 all_filter_parts.append(filter_item) 330 331 # Combine all filter parts 332 if all_filter_parts: 333 if len(all_filter_parts) == 1: 334 filter_string = all_filter_parts[0] 335 else: 336 filter_string = combine_filters(all_filter_parts) 337 338 # Add the filter to command if we have one 339 if filter_string: 340 command.extend(["--filter", filter_string]) 341 342 # Add options 343 if permute_checkbox: 344 command.append("--permute") 345 if permuteall_checkbox: 346 command.append("--permuteall") 347 if AI_checkbox: 348 command.append("--ai") 349 if no_nsfw_checkbox: 350 command.append("--no-nsfw") 351 if no_update_checkbox: 352 command.append("--no-update") 353 if csv_checkbox: 354 command.append("--csv") 355 if pdf_checkbox: 356 command.append("--pdf") 357 if json_checkbox: 358 command.append("--json") 359 if verbose_checkbox: 360 command.append("-v") 361 if dump_checkbox: 362 command.append("--dump") 363 364 # Add proxy if specified 365 if proxy_input: 366 command.extend(["--proxy", proxy_input]) 367 368 # Add timeout 369 command.extend(["--timeout", str(timeout_spinbox)]) 370 371 # Max concurrent requests 372 command.extend(["--max-concurrent-requests", str(max_concurrent_requests)]) 373 374 # Add Instagram session ID if specified 375 if instagram_session_id: 376 pass 377 378 return command