/ nomadnet / Conversation.py
Conversation.py
  1  import os
  2  import RNS
  3  import LXMF
  4  import shutil
  5  import nomadnet
  6  from nomadnet.Directory import DirectoryEntry
  7  
  8  class Conversation:
  9      cached_conversations = {}
 10      unread_conversations = {}
 11      created_callback = None
 12  
 13      aspect_filter = "lxmf.delivery"
 14      @staticmethod
 15      def received_announce(destination_hash, announced_identity, app_data):
 16          app = nomadnet.NomadNetworkApp.get_shared_instance()
 17  
 18          if not destination_hash in app.ignored_list:
 19              destination_hash_text = RNS.hexrep(destination_hash, delimit=False)
 20              # Check if the announced destination is in
 21              # our list of conversations
 22              if destination_hash_text in [e[0] for e in Conversation.conversation_list(app)]:
 23                  if app.directory.find(destination_hash):
 24                      if Conversation.created_callback != None:
 25                          Conversation.created_callback()
 26                  else:
 27                      if Conversation.created_callback != None:
 28                          Conversation.created_callback()
 29  
 30              # This reformats the new v0.5.0 announce data back to the expected format
 31              # for nomadnets storage and other handling functions.
 32              dn = LXMF.display_name_from_app_data(app_data)
 33              app_data = b""
 34              if dn != None:
 35                  app_data = dn.encode("utf-8")
 36              
 37              # Add the announce to the directory announce
 38              # stream logger
 39              app.directory.lxmf_announce_received(destination_hash, app_data)
 40              
 41          else:
 42              RNS.log("Ignored announce from "+RNS.prettyhexrep(destination_hash), RNS.LOG_DEBUG)
 43  
 44      @staticmethod
 45      def query_for_peer(source_hash):
 46          try:
 47              RNS.Transport.request_path(bytes.fromhex(source_hash))
 48          except Exception as e:
 49              RNS.log("Error while querying network for peer identity. The contained exception was: "+str(e), RNS.LOG_ERROR)
 50  
 51      @staticmethod
 52      def ingest(lxmessage, app, originator = False, delegate = None):
 53          if originator:
 54              source_hash = lxmessage.destination_hash
 55          else:
 56              source_hash = lxmessage.source_hash
 57          
 58          source_hash_path = RNS.hexrep(source_hash, delimit=False)
 59  
 60          conversation_path = app.conversationpath + "/" + source_hash_path
 61  
 62          if not os.path.isdir(conversation_path):
 63              os.makedirs(conversation_path)
 64              if Conversation.created_callback != None:
 65                  Conversation.created_callback()
 66  
 67          ingested_path = lxmessage.write_to_directory(conversation_path)
 68  
 69          if RNS.hexrep(source_hash, delimit=False) in Conversation.cached_conversations:
 70              conversation = Conversation.cached_conversations[RNS.hexrep(source_hash, delimit=False)]
 71              conversation.scan_storage()
 72  
 73          if not source_hash in Conversation.unread_conversations:
 74              Conversation.unread_conversations[source_hash] = True
 75              try:
 76                  dirname = RNS.hexrep(source_hash, delimit=False)
 77                  open(app.conversationpath + "/" + dirname + "/unread", 'a').close()
 78              except Exception as e:
 79                  pass
 80  
 81              if Conversation.created_callback != None:
 82                  Conversation.created_callback()
 83  
 84          return ingested_path
 85  
 86      @staticmethod
 87      def conversation_list(app):
 88          conversations = []
 89          for dirname in os.listdir(app.conversationpath):
 90              if len(dirname) == RNS.Identity.TRUNCATED_HASHLENGTH//8*2 and os.path.isdir(app.conversationpath + "/" + dirname):
 91                  try:
 92                      source_hash_text = dirname
 93                      source_hash      = bytes.fromhex(dirname)
 94                      app_data         = RNS.Identity.recall_app_data(source_hash)
 95                      display_name     = app.directory.display_name(source_hash)
 96  
 97                      unread = False
 98                      if source_hash in Conversation.unread_conversations:
 99                          unread = True
100                      elif os.path.isfile(app.conversationpath + "/" + dirname + "/unread"):
101                          Conversation.unread_conversations[source_hash] = True
102                          unread = True
103  
104                      if display_name == None and app_data:
105                          display_name = LXMF.display_name_from_app_data(app_data)
106  
107                      if display_name == None:
108                          sort_name = ""
109                      else:
110                          sort_name = display_name
111                      
112                      trust_level      = app.directory.trust_level(source_hash, display_name)
113                      
114                      entry = (source_hash_text, display_name, trust_level, sort_name, unread)
115                      conversations.append(entry)
116  
117                  except Exception as e:
118                      RNS.log("Error while loading conversation "+str(dirname)+", skipping it. The contained exception was: "+str(e), RNS.LOG_ERROR)
119  
120          conversations.sort(key=lambda e: (-e[2], e[3], e[0]), reverse=False)
121  
122          return conversations
123  
124      @staticmethod
125      def cache_conversation(conversation):
126          Conversation.cached_conversations[conversation.source_hash] = conversation
127  
128      @staticmethod
129      def delete_conversation(source_hash_path, app):
130          conversation_path = app.conversationpath + "/" + source_hash_path
131  
132          try:
133              if os.path.isdir(conversation_path):
134                  shutil.rmtree(conversation_path)
135          except Exception as e:
136              RNS.log("Could not remove conversation at "+str(conversation_path)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
137  
138      def __init__(self, source_hash, app, initiator=False):
139          self.app                = app
140          self.source_hash        = source_hash
141          self.send_destination   = None
142          self.messages           = []
143          self.messages_path      = app.conversationpath + "/" + source_hash
144          self.messages_load_time = None
145          self.source_known       = False
146          self.source_trusted     = False
147          self.source_blocked     = False
148          self.unread             = False
149  
150          self.__changed_callback = None
151  
152          if not RNS.Identity.recall(bytes.fromhex(self.source_hash)):
153              RNS.Transport.request_path(bytes.fromhex(source_hash))
154  
155          self.source_identity = RNS.Identity.recall(bytes.fromhex(self.source_hash))
156  
157          if self.source_identity:
158              self.source_known = True
159              self.send_destination = RNS.Destination(self.source_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
160  
161          if initiator:
162              if not os.path.isdir(self.messages_path):
163                  os.makedirs(self.messages_path)
164                  if Conversation.created_callback != None:
165                      Conversation.created_callback()
166  
167          self.scan_storage()
168  
169          self.trust_level = app.directory.trust_level(bytes.fromhex(self.source_hash))
170  
171          Conversation.cache_conversation(self)
172  
173      def scan_storage(self):
174          old_len = len(self.messages)
175          self.messages = []
176          for filename in os.listdir(self.messages_path):
177              if len(filename) == RNS.Identity.HASHLENGTH//8*2:
178                  message_path = self.messages_path + "/" + filename
179                  self.messages.append(ConversationMessage(message_path))
180  
181          new_len = len(self.messages)
182  
183          if new_len > old_len:
184              self.unread = True
185  
186          if self.__changed_callback != None:
187              self.__changed_callback(self)
188  
189      def purge_failed(self):
190          purged_messages = []
191          for conversation_message in self.messages:
192              if conversation_message.get_state() == LXMF.LXMessage.FAILED:
193                  purged_messages.append(conversation_message)
194                  conversation_message.purge()
195  
196          for purged_message in purged_messages:
197              self.messages.remove(purged_message)
198  
199      def clear_history(self):
200          purged_messages = []
201          for conversation_message in self.messages:
202              purged_messages.append(conversation_message)
203              conversation_message.purge()
204  
205          for purged_message in purged_messages:
206              self.messages.remove(purged_message)
207  
208      def register_changed_callback(self, callback):
209          self.__changed_callback = callback
210  
211      def send(self, content="", title=""):
212          if self.send_destination:
213              dest = self.send_destination
214              source = self.app.lxmf_destination
215              desired_method = LXMF.LXMessage.DIRECT
216              if self.app.directory.preferred_delivery(dest.hash) == DirectoryEntry.PROPAGATED:
217                  if self.app.message_router.get_outbound_propagation_node() != None:
218                      desired_method = LXMF.LXMessage.PROPAGATED
219              else:
220                  if not self.app.message_router.delivery_link_available(dest.hash) and RNS.Identity.current_ratchet_id(dest.hash) != None:
221                      RNS.log(f"Have ratchet for {RNS.prettyhexrep(dest.hash)}, requesting opportunistic delivery of message", RNS.LOG_DEBUG)
222                      desired_method = LXMF.LXMessage.OPPORTUNISTIC
223  
224              dest_is_trusted = False
225              if self.app.directory.trust_level(dest.hash) == DirectoryEntry.TRUSTED:
226                  dest_is_trusted = True
227  
228              lxm = LXMF.LXMessage(dest, source, content, title=title, desired_method=desired_method, include_ticket=dest_is_trusted)
229              lxm.register_delivery_callback(self.message_notification)
230              lxm.register_failed_callback(self.message_notification)
231  
232              if self.app.message_router.get_outbound_propagation_node() != None:
233                  lxm.try_propagation_on_fail = self.app.try_propagation_on_fail
234  
235              self.app.message_router.handle_outbound(lxm)
236  
237              message_path = Conversation.ingest(lxm, self.app, originator=True)
238              self.messages.append(ConversationMessage(message_path))
239  
240              return True
241          else:
242              RNS.log("Destination is not known, cannot create LXMF Message.", RNS.LOG_VERBOSE)
243              return False
244  
245      def paper_output(self, content="", title="", mode="print_qr"):
246          if self.send_destination:
247              try:
248                  dest = self.send_destination
249                  source = self.app.lxmf_destination
250                  desired_method = LXMF.LXMessage.PAPER
251  
252                  lxm = LXMF.LXMessage(dest, source, content, title=title, desired_method=desired_method)
253  
254                  if mode == "print_qr":
255                      qr_code = lxm.as_qr()
256                      qr_tmp_path = self.app.tmpfilespath+"/"+str(RNS.hexrep(lxm.hash, delimit=False))
257                      qr_code.save(qr_tmp_path)
258  
259                      print_result = self.app.print_file(qr_tmp_path)
260                      os.unlink(qr_tmp_path)
261                      
262                      if print_result:
263                          message_path = Conversation.ingest(lxm, self.app, originator=True)
264                          self.messages.append(ConversationMessage(message_path))
265  
266                      return print_result
267  
268                  elif mode == "save_qr":
269                      qr_code = lxm.as_qr()
270                      qr_save_path = self.app.downloads_path+"/LXM_"+str(RNS.hexrep(lxm.hash, delimit=False)+".png")
271                      qr_code.save(qr_save_path)
272                      message_path = Conversation.ingest(lxm, self.app, originator=True)
273                      self.messages.append(ConversationMessage(message_path))
274                      return qr_save_path
275  
276                  elif mode == "save_uri":
277                      lxm_uri = lxm.as_uri()+"\n"
278                      uri_save_path = self.app.downloads_path+"/LXM_"+str(RNS.hexrep(lxm.hash, delimit=False)+".txt")
279                      with open(uri_save_path, "wb") as f:
280                          f.write(lxm_uri.encode("utf-8"))
281  
282                      message_path = Conversation.ingest(lxm, self.app, originator=True)
283                      self.messages.append(ConversationMessage(message_path))
284                      return uri_save_path
285  
286                  elif mode == "return_uri":
287                      return lxm.as_uri()
288  
289              except Exception as e:
290                  RNS.log("An error occurred while generating paper message, the contained exception was: "+str(e), RNS.LOG_ERROR)
291                  return False
292  
293          else:
294              RNS.log("Destination is not known, cannot create LXMF Message.", RNS.LOG_VERBOSE)
295              return False
296  
297      def message_notification(self, message):
298          if message.state == LXMF.LXMessage.FAILED and hasattr(message, "try_propagation_on_fail") and message.try_propagation_on_fail:
299              if hasattr(message, "stamp_generation_failed") and message.stamp_generation_failed == True:
300                  RNS.log(f"Could not send {message} due to a stamp generation failure", RNS.LOG_ERROR)
301              else:
302                  RNS.log("Direct delivery of "+str(message)+" failed. Retrying as propagated message.", RNS.LOG_VERBOSE)
303                  message.try_propagation_on_fail = None
304                  message.delivery_attempts = 0
305                  if hasattr(message, "next_delivery_attempt"):
306                      del message.next_delivery_attempt
307                  message.packed = None
308                  message.desired_method = LXMF.LXMessage.PROPAGATED
309                  self.app.message_router.handle_outbound(message)
310          else:
311              message_path = Conversation.ingest(message, self.app, originator=True)
312  
313      def __str__(self):
314          string = self.source_hash
315  
316          # TODO: Remove this
317          # if self.source_identity:
318          #     if self.source_identity.app_data:
319          #         # TODO: Sanitise for viewing, or just clean this
320          #         string += " | "+self.source_identity.app_data.decode("utf-8")
321  
322          return string
323  
324  
325  
326  class ConversationMessage:
327      def __init__(self, file_path):
328          self.file_path = file_path
329          self.loaded    = False
330          self.timestamp = None
331          self.lxm       = None
332  
333      def load(self):
334          try:
335              self.lxm = LXMF.LXMessage.unpack_from_file(open(self.file_path, "rb"))
336              self.loaded = True
337              self.timestamp = self.lxm.timestamp
338              self.sort_timestamp = os.path.getmtime(self.file_path)
339  
340              if self.lxm.state > LXMF.LXMessage.GENERATING and self.lxm.state < LXMF.LXMessage.SENT:
341                  found = False
342  
343                  for pending in nomadnet.NomadNetworkApp.get_shared_instance().message_router.pending_outbound:
344                      if pending.hash == self.lxm.hash:
345                          found = True
346  
347                  for pending_id in nomadnet.NomadNetworkApp.get_shared_instance().message_router.pending_deferred_stamps:
348                      if pending_id == self.lxm.hash:
349                          found = True
350  
351                  if not found:
352                      self.lxm.state = LXMF.LXMessage.FAILED
353  
354          except Exception as e:
355              RNS.log("Error while loading LXMF message "+str(self.file_path)+" from disk. The contained exception was: "+str(e), RNS.LOG_ERROR)
356  
357      def unload(self):
358          self.loaded = False
359          self.lxm    = None
360  
361      def purge(self):
362          self.unload()
363          if os.path.isfile(self.file_path):
364              os.unlink(self.file_path)
365  
366      def get_timestamp(self):
367          if not self.loaded:
368              self.load()
369  
370          return self.timestamp
371  
372      def get_title(self):
373          if not self.loaded:
374              self.load()
375  
376          return self.lxm.title_as_string()
377  
378      def get_content(self):
379          if not self.loaded:
380              self.load()
381  
382          return self.lxm.content_as_string()
383  
384      def get_hash(self):
385          if not self.loaded:
386              self.load()
387  
388          return self.lxm.hash
389  
390      def get_state(self):
391          if not self.loaded:
392              self.load()
393  
394          return self.lxm.state
395  
396      def get_transport_encryption(self):
397          if not self.loaded:
398              self.load()
399  
400          return self.lxm.transport_encryption
401  
402      def get_transport_encrypted(self):
403          if not self.loaded:
404              self.load()
405  
406          return self.lxm.transport_encrypted
407  
408      def signature_validated(self):
409          if not self.loaded:
410              self.load()
411  
412          return self.lxm.signature_validated
413  
414      def get_signature_description(self):
415          if self.signature_validated():
416              return "Signature Verified"
417          else:
418              if self.lxm.unverified_reason == LXMF.LXMessage.SOURCE_UNKNOWN:
419                  return "Unknown Origin"
420              elif self.lxm.unverified_reason == LXMF.LXMessage.SIGNATURE_INVALID:
421                  return "Invalid Signature"
422              else:
423                  return "Unknown signature validation failure"