Conversation.py
1 import os 2 import RNS 3 import LXMF 4 import shutil 5 import nomadnet 6 from nomadnet.Directory import DirectoryEntry 7 8 class Conversation: 9 cached_conversations = {} 10 unread_conversations = {} 11 created_callback = None 12 13 aspect_filter = "lxmf.delivery" 14 @staticmethod 15 def received_announce(destination_hash, announced_identity, app_data): 16 app = nomadnet.NomadNetworkApp.get_shared_instance() 17 18 if not destination_hash in app.ignored_list: 19 destination_hash_text = RNS.hexrep(destination_hash, delimit=False) 20 # Check if the announced destination is in 21 # our list of conversations 22 if destination_hash_text in [e[0] for e in Conversation.conversation_list(app)]: 23 if app.directory.find(destination_hash): 24 if Conversation.created_callback != None: 25 Conversation.created_callback() 26 else: 27 if Conversation.created_callback != None: 28 Conversation.created_callback() 29 30 # This reformats the new v0.5.0 announce data back to the expected format 31 # for nomadnets storage and other handling functions. 32 dn = LXMF.display_name_from_app_data(app_data) 33 app_data = b"" 34 if dn != None: 35 app_data = dn.encode("utf-8") 36 37 # Add the announce to the directory announce 38 # stream logger 39 app.directory.lxmf_announce_received(destination_hash, app_data) 40 41 else: 42 RNS.log("Ignored announce from "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG) 43 44 @staticmethod 45 def query_for_peer(source_hash): 46 try: 47 RNS.Transport.request_path(bytes.fromhex(source_hash)) 48 except Exception as e: 49 RNS.log("Error while querying network for peer identity. The contained exception was: "+str(e), RNS.LOG_ERROR) 50 51 @staticmethod 52 def ingest(lxmessage, app, originator = False, delegate = None): 53 if originator: 54 source_hash = lxmessage.destination_hash 55 else: 56 source_hash = lxmessage.source_hash 57 58 source_hash_path = RNS.hexrep(source_hash, delimit=False) 59 60 conversation_path = app.conversationpath + "/" + source_hash_path 61 62 if not os.path.isdir(conversation_path): 63 os.makedirs(conversation_path) 64 if Conversation.created_callback != None: 65 Conversation.created_callback() 66 67 ingested_path = lxmessage.write_to_directory(conversation_path) 68 69 if RNS.hexrep(source_hash, delimit=False) in Conversation.cached_conversations: 70 conversation = Conversation.cached_conversations[RNS.hexrep(source_hash, delimit=False)] 71 conversation.scan_storage() 72 73 if not source_hash in Conversation.unread_conversations: 74 Conversation.unread_conversations[source_hash] = True 75 try: 76 dirname = RNS.hexrep(source_hash, delimit=False) 77 open(app.conversationpath + "/" + dirname + "/unread", 'a').close() 78 except Exception as e: 79 pass 80 81 if Conversation.created_callback != None: 82 Conversation.created_callback() 83 84 return ingested_path 85 86 @staticmethod 87 def conversation_list(app): 88 conversations = [] 89 for dirname in os.listdir(app.conversationpath): 90 if len(dirname) == RNS.Identity.TRUNCATED_HASHLENGTH//8*2 and os.path.isdir(app.conversationpath + "/" + dirname): 91 try: 92 source_hash_text = dirname 93 source_hash = bytes.fromhex(dirname) 94 app_data = RNS.Identity.recall_app_data(source_hash) 95 display_name = app.directory.display_name(source_hash) 96 97 unread = False 98 if source_hash in Conversation.unread_conversations: 99 unread = True 100 elif os.path.isfile(app.conversationpath + "/" + dirname + "/unread"): 101 Conversation.unread_conversations[source_hash] = True 102 unread = True 103 104 if display_name == None and app_data: 105 display_name = LXMF.display_name_from_app_data(app_data) 106 107 if display_name == None: 108 sort_name = "" 109 else: 110 sort_name = display_name 111 112 trust_level = app.directory.trust_level(source_hash, display_name) 113 114 entry = (source_hash_text, display_name, trust_level, sort_name, unread) 115 conversations.append(entry) 116 117 except Exception as e: 118 RNS.log("Error while loading conversation "+str(dirname)+", skipping it. The contained exception was: "+str(e), RNS.LOG_ERROR) 119 120 conversations.sort(key=lambda e: (-e[2], e[3], e[0]), reverse=False) 121 122 return conversations 123 124 @staticmethod 125 def cache_conversation(conversation): 126 Conversation.cached_conversations[conversation.source_hash] = conversation 127 128 @staticmethod 129 def delete_conversation(source_hash_path, app): 130 conversation_path = app.conversationpath + "/" + source_hash_path 131 132 try: 133 if os.path.isdir(conversation_path): 134 shutil.rmtree(conversation_path) 135 except Exception as e: 136 RNS.log("Could not remove conversation at "+str(conversation_path)+". The contained exception was: "+str(e), RNS.LOG_ERROR) 137 138 def __init__(self, source_hash, app, initiator=False): 139 self.app = app 140 self.source_hash = source_hash 141 self.send_destination = None 142 self.messages = [] 143 self.messages_path = app.conversationpath + "/" + source_hash 144 self.messages_load_time = None 145 self.source_known = False 146 self.source_trusted = False 147 self.source_blocked = False 148 self.unread = False 149 150 self.__changed_callback = None 151 152 if not RNS.Identity.recall(bytes.fromhex(self.source_hash)): 153 RNS.Transport.request_path(bytes.fromhex(source_hash)) 154 155 self.source_identity = RNS.Identity.recall(bytes.fromhex(self.source_hash)) 156 157 if self.source_identity: 158 self.source_known = True 159 self.send_destination = RNS.Destination(self.source_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery") 160 161 if initiator: 162 if not os.path.isdir(self.messages_path): 163 os.makedirs(self.messages_path) 164 if Conversation.created_callback != None: 165 Conversation.created_callback() 166 167 self.scan_storage() 168 169 self.trust_level = app.directory.trust_level(bytes.fromhex(self.source_hash)) 170 171 Conversation.cache_conversation(self) 172 173 def scan_storage(self): 174 old_len = len(self.messages) 175 self.messages = [] 176 for filename in os.listdir(self.messages_path): 177 if len(filename) == RNS.Identity.HASHLENGTH//8*2: 178 message_path = self.messages_path + "/" + filename 179 self.messages.append(ConversationMessage(message_path)) 180 181 new_len = len(self.messages) 182 183 if new_len > old_len: 184 self.unread = True 185 186 if self.__changed_callback != None: 187 self.__changed_callback(self) 188 189 def purge_failed(self): 190 purged_messages = [] 191 for conversation_message in self.messages: 192 if conversation_message.get_state() == LXMF.LXMessage.FAILED: 193 purged_messages.append(conversation_message) 194 conversation_message.purge() 195 196 for purged_message in purged_messages: 197 self.messages.remove(purged_message) 198 199 def clear_history(self): 200 purged_messages = [] 201 for conversation_message in self.messages: 202 purged_messages.append(conversation_message) 203 conversation_message.purge() 204 205 for purged_message in purged_messages: 206 self.messages.remove(purged_message) 207 208 def register_changed_callback(self, callback): 209 self.__changed_callback = callback 210 211 def send(self, content="", title=""): 212 if self.send_destination: 213 dest = self.send_destination 214 source = self.app.lxmf_destination 215 desired_method = LXMF.LXMessage.DIRECT 216 if self.app.directory.preferred_delivery(dest.hash) == DirectoryEntry.PROPAGATED: 217 if self.app.message_router.get_outbound_propagation_node() != None: 218 desired_method = LXMF.LXMessage.PROPAGATED 219 else: 220 if not self.app.message_router.delivery_link_available(dest.hash) and RNS.Identity.current_ratchet_id(dest.hash) != None: 221 RNS.log(f"Have ratchet for {RNS.prettyhexrep(dest.hash)}, requesting opportunistic delivery of message", RNS.LOG_DEBUG) 222 desired_method = LXMF.LXMessage.OPPORTUNISTIC 223 224 dest_is_trusted = False 225 if self.app.directory.trust_level(dest.hash) == DirectoryEntry.TRUSTED: 226 dest_is_trusted = True 227 228 lxm = LXMF.LXMessage(dest, source, content, title=title, desired_method=desired_method, include_ticket=dest_is_trusted) 229 lxm.register_delivery_callback(self.message_notification) 230 lxm.register_failed_callback(self.message_notification) 231 232 if self.app.message_router.get_outbound_propagation_node() != None: 233 lxm.try_propagation_on_fail = self.app.try_propagation_on_fail 234 235 self.app.message_router.handle_outbound(lxm) 236 237 message_path = Conversation.ingest(lxm, self.app, originator=True) 238 self.messages.append(ConversationMessage(message_path)) 239 240 return True 241 else: 242 RNS.log("Destination is not known, cannot create LXMF Message.", RNS.LOG_VERBOSE) 243 return False 244 245 def paper_output(self, content="", title="", mode="print_qr"): 246 if self.send_destination: 247 try: 248 dest = self.send_destination 249 source = self.app.lxmf_destination 250 desired_method = LXMF.LXMessage.PAPER 251 252 lxm = LXMF.LXMessage(dest, source, content, title=title, desired_method=desired_method) 253 254 if mode == "print_qr": 255 qr_code = lxm.as_qr() 256 qr_tmp_path = self.app.tmpfilespath+"/"+str(RNS.hexrep(lxm.hash, delimit=False)) 257 qr_code.save(qr_tmp_path) 258 259 print_result = self.app.print_file(qr_tmp_path) 260 os.unlink(qr_tmp_path) 261 262 if print_result: 263 message_path = Conversation.ingest(lxm, self.app, originator=True) 264 self.messages.append(ConversationMessage(message_path)) 265 266 return print_result 267 268 elif mode == "save_qr": 269 qr_code = lxm.as_qr() 270 qr_save_path = self.app.downloads_path+"/LXM_"+str(RNS.hexrep(lxm.hash, delimit=False)+".png") 271 qr_code.save(qr_save_path) 272 message_path = Conversation.ingest(lxm, self.app, originator=True) 273 self.messages.append(ConversationMessage(message_path)) 274 return qr_save_path 275 276 elif mode == "save_uri": 277 lxm_uri = lxm.as_uri()+"\n" 278 uri_save_path = self.app.downloads_path+"/LXM_"+str(RNS.hexrep(lxm.hash, delimit=False)+".txt") 279 with open(uri_save_path, "wb") as f: 280 f.write(lxm_uri.encode("utf-8")) 281 282 message_path = Conversation.ingest(lxm, self.app, originator=True) 283 self.messages.append(ConversationMessage(message_path)) 284 return uri_save_path 285 286 elif mode == "return_uri": 287 return lxm.as_uri() 288 289 except Exception as e: 290 RNS.log("An error occurred while generating paper message, the contained exception was: "+str(e), RNS.LOG_ERROR) 291 return False 292 293 else: 294 RNS.log("Destination is not known, cannot create LXMF Message.", RNS.LOG_VERBOSE) 295 return False 296 297 def message_notification(self, message): 298 if message.state == LXMF.LXMessage.FAILED and hasattr(message, "try_propagation_on_fail") and message.try_propagation_on_fail: 299 if hasattr(message, "stamp_generation_failed") and message.stamp_generation_failed == True: 300 RNS.log(f"Could not send {message} due to a stamp generation failure", RNS.LOG_ERROR) 301 else: 302 RNS.log("Direct delivery of "+str(message)+" failed. Retrying as propagated message.", RNS.LOG_VERBOSE) 303 message.try_propagation_on_fail = None 304 message.delivery_attempts = 0 305 if hasattr(message, "next_delivery_attempt"): 306 del message.next_delivery_attempt 307 message.packed = None 308 message.desired_method = LXMF.LXMessage.PROPAGATED 309 self.app.message_router.handle_outbound(message) 310 else: 311 message_path = Conversation.ingest(message, self.app, originator=True) 312 313 def __str__(self): 314 string = self.source_hash 315 316 # TODO: Remove this 317 # if self.source_identity: 318 # if self.source_identity.app_data: 319 # # TODO: Sanitise for viewing, or just clean this 320 # string += " | "+self.source_identity.app_data.decode("utf-8") 321 322 return string 323 324 325 326 class ConversationMessage: 327 def __init__(self, file_path): 328 self.file_path = file_path 329 self.loaded = False 330 self.timestamp = None 331 self.lxm = None 332 333 def load(self): 334 try: 335 self.lxm = LXMF.LXMessage.unpack_from_file(open(self.file_path, "rb")) 336 self.loaded = True 337 self.timestamp = self.lxm.timestamp 338 self.sort_timestamp = os.path.getmtime(self.file_path) 339 340 if self.lxm.state > LXMF.LXMessage.GENERATING and self.lxm.state < LXMF.LXMessage.SENT: 341 found = False 342 343 for pending in nomadnet.NomadNetworkApp.get_shared_instance().message_router.pending_outbound: 344 if pending.hash == self.lxm.hash: 345 found = True 346 347 for pending_id in nomadnet.NomadNetworkApp.get_shared_instance().message_router.pending_deferred_stamps: 348 if pending_id == self.lxm.hash: 349 found = True 350 351 if not found: 352 self.lxm.state = LXMF.LXMessage.FAILED 353 354 except Exception as e: 355 RNS.log("Error while loading LXMF message "+str(self.file_path)+" from disk. The contained exception was: "+str(e), RNS.LOG_ERROR) 356 357 def unload(self): 358 self.loaded = False 359 self.lxm = None 360 361 def purge(self): 362 self.unload() 363 if os.path.isfile(self.file_path): 364 os.unlink(self.file_path) 365 366 def get_timestamp(self): 367 if not self.loaded: 368 self.load() 369 370 return self.timestamp 371 372 def get_title(self): 373 if not self.loaded: 374 self.load() 375 376 return self.lxm.title_as_string() 377 378 def get_content(self): 379 if not self.loaded: 380 self.load() 381 382 return self.lxm.content_as_string() 383 384 def get_hash(self): 385 if not self.loaded: 386 self.load() 387 388 return self.lxm.hash 389 390 def get_state(self): 391 if not self.loaded: 392 self.load() 393 394 return self.lxm.state 395 396 def get_transport_encryption(self): 397 if not self.loaded: 398 self.load() 399 400 return self.lxm.transport_encryption 401 402 def get_transport_encrypted(self): 403 if not self.loaded: 404 self.load() 405 406 return self.lxm.transport_encrypted 407 408 def signature_validated(self): 409 if not self.loaded: 410 self.load() 411 412 return self.lxm.signature_validated 413 414 def get_signature_description(self): 415 if self.signature_validated(): 416 return "Signature Verified" 417 else: 418 if self.lxm.unverified_reason == LXMF.LXMessage.SOURCE_UNKNOWN: 419 return "Unknown Origin" 420 elif self.lxm.unverified_reason == LXMF.LXMessage.SIGNATURE_INVALID: 421 return "Invalid Signature" 422 else: 423 return "Unknown signature validation failure"