/ nomadnet / Node.py
Node.py
  1  import os
  2  import sys
  3  
  4  import RNS
  5  import time
  6  import threading
  7  import subprocess
  8  import RNS.vendor.umsgpack as msgpack
  9  
 10  class Node:
 11      JOB_INTERVAL = 5
 12      START_ANNOUNCE_DELAY = 6
 13  
 14      def __init__(self, app):
 15          RNS.log("Nomad Network Node starting...", RNS.LOG_VERBOSE)
 16          self.app = app
 17          self.identity = self.app.identity
 18          self.destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, "nomadnetwork", "node")
 19          self.last_announce = time.time()
 20          self.last_file_refresh = time.time()
 21          self.last_page_refresh = time.time()
 22          self.announce_interval = self.app.node_announce_interval
 23          self.page_refresh_interval = self.app.page_refresh_interval
 24          self.file_refresh_interval = self.app.file_refresh_interval
 25          self.job_interval = Node.JOB_INTERVAL
 26          self.should_run_jobs = True
 27          self.app_data = None
 28          self.name = self.app.node_name
 29  
 30          self.register_pages()
 31          self.register_files()
 32  
 33          self.destination.set_link_established_callback(self.peer_connected)
 34  
 35          if self.name == None:
 36              self.name = self.app.peer_settings["display_name"]+"'s Node"
 37  
 38          RNS.log("Node \""+self.name+"\" ready for incoming connections on "+RNS.prettyhexrep(self.destination.hash), RNS.LOG_VERBOSE)
 39  
 40          if self.app.node_announce_at_start:
 41              def delayed_announce():
 42                  time.sleep(Node.START_ANNOUNCE_DELAY)
 43                  self.announce()
 44  
 45              da_thread = threading.Thread(target=delayed_announce)
 46              da_thread.setDaemon(True)
 47              da_thread.start()
 48  
 49          job_thread = threading.Thread(target=self.__jobs)
 50          job_thread.setDaemon(True)
 51          job_thread.start()
 52  
 53  
 54      def register_pages(self):
 55          # TODO: Deregister previously registered pages
 56          # that no longer exist.
 57          self.servedpages = []
 58          self.scan_pages(self.app.pagespath)
 59  
 60          if not self.app.pagespath+"index.mu" in self.servedpages:
 61              self.destination.register_request_handler(
 62                  "/page/index.mu",
 63                  response_generator = self.serve_default_index,
 64                  allow = RNS.Destination.ALLOW_ALL)
 65  
 66          for page in self.servedpages:
 67              request_path = "/page"+page.replace(self.app.pagespath, "")
 68              self.destination.register_request_handler(
 69                  request_path,
 70                  response_generator = self.serve_page,
 71                  allow = RNS.Destination.ALLOW_ALL)
 72  
 73      def register_files(self):
 74          # TODO: Deregister previously registered files
 75          # that no longer exist.
 76          self.servedfiles = []
 77          self.scan_files(self.app.filespath)
 78  
 79          for file in self.servedfiles:
 80              request_path = "/file"+file.replace(self.app.filespath, "")
 81              self.destination.register_request_handler(
 82                  request_path,
 83                  response_generator = self.serve_file,
 84                  allow = RNS.Destination.ALLOW_ALL,
 85                  auto_compress = 32_000_000)
 86  
 87      def scan_pages(self, base_path):
 88          files = [file for file in os.listdir(base_path) if os.path.isfile(os.path.join(base_path, file)) and file[:1] != "."]
 89          directories = [file for file in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, file)) and file[:1] != "."]
 90  
 91          for file in files:
 92              if not file.endswith(".allowed"):
 93                  self.servedpages.append(base_path+"/"+file)
 94  
 95          for directory in directories:
 96              self.scan_pages(base_path+"/"+directory)
 97  
 98      def scan_files(self, base_path):
 99          files = [file for file in os.listdir(base_path) if os.path.isfile(os.path.join(base_path, file)) and file[:1] != "."]
100          directories = [file for file in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, file)) and file[:1] != "."]
101  
102          for file in files:
103              self.servedfiles.append(base_path+"/"+file)
104  
105          for directory in directories:
106              self.scan_files(base_path+"/"+directory)
107  
108      def serve_page(self, path, data, request_id, link_id, remote_identity, requested_at):
109          RNS.log("Page request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_VERBOSE)
110          try:
111              self.app.peer_settings["served_page_requests"] += 1
112              self.app.save_peer_settings()
113              
114          except Exception as e:
115              RNS.log("Could not increase served page request count", RNS.LOG_ERROR)
116  
117          file_path = path.replace("/page", self.app.pagespath, 1)
118  
119          allowed_path = file_path+".allowed"
120          request_allowed = False
121  
122          if os.path.isfile(allowed_path):
123              allowed_list = []
124  
125              try:
126                  if os.access(allowed_path, os.X_OK):
127                      allowed_result = subprocess.run([allowed_path], stdout=subprocess.PIPE)
128                      allowed_input = allowed_result.stdout
129  
130                  else:
131                      fh = open(allowed_path, "rb")
132                      allowed_input = fh.read()
133                      fh.close()
134  
135                  allowed_hash_strs = allowed_input.splitlines()
136  
137                  for hash_str in allowed_hash_strs:
138                      if len(hash_str) == RNS.Identity.TRUNCATED_HASHLENGTH//8*2:
139                          try:
140                              allowed_hash = bytes.fromhex(hash_str.decode("utf-8"))
141                              allowed_list.append(allowed_hash)
142  
143                          except Exception as e:
144                              RNS.log("Could not decode RNS Identity hash from: "+str(hash_str), RNS.LOG_DEBUG)
145                              RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
146  
147              except Exception as e:
148                  RNS.log("Error while fetching list of allowed identities for request: "+str(e), RNS.LOG_ERROR)
149  
150              if hasattr(remote_identity, "hash") and remote_identity.hash in allowed_list:
151                  request_allowed = True
152              else:
153                  request_allowed = False
154                  RNS.log("Denying request, remote identity was not in list of allowed identities", RNS.LOG_VERBOSE)
155  
156          else:
157              request_allowed = True
158  
159          try:
160              if request_allowed:
161                  RNS.log("Serving page: "+file_path, RNS.LOG_VERBOSE)
162                  if not RNS.vendor.platformutils.is_windows() and os.access(file_path, os.X_OK):
163                      env_map = {}
164                      if "PATH" in os.environ:
165                          env_map["PATH"] = os.environ["PATH"]
166                      if link_id != None:
167                          env_map["link_id"] = RNS.hexrep(link_id, delimit=False)
168                      if remote_identity != None:
169                          env_map["remote_identity"] = RNS.hexrep(remote_identity.hash, delimit=False)
170  
171                      if data != None and isinstance(data, dict):
172                          for e in data:
173                              if isinstance(e, str) and (e.startswith("field_") or e.startswith("var_")):
174                                  env_map[e] = data[e]
175  
176                      generated = subprocess.run([file_path], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, env=env_map)
177                      return generated.stdout
178                  else:
179                      fh = open(file_path, "rb")
180                      response_data = fh.read()
181                      fh.close()
182                      return response_data
183              else:
184                  RNS.log("Request denied", RNS.LOG_VERBOSE)
185                  return DEFAULT_NOTALLOWED.encode("utf-8")
186  
187          except Exception as e:
188              RNS.log("Error occurred while handling request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_ERROR)
189              RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
190              return None
191  
192      # TODO: Improve file handling, this will be slow for large files
193      def serve_file(self, path, data, request_id, remote_identity, requested_at):
194          RNS.log("File request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_VERBOSE)
195          try:
196              self.app.peer_settings["served_file_requests"] += 1
197              self.app.save_peer_settings()
198              
199          except Exception as e:
200              RNS.log("Could not increase served file request count", RNS.LOG_ERROR)
201  
202          file_path = path.replace("/file", self.app.filespath, 1)
203          file_name = path.replace("/file/", "", 1)
204          try:
205              RNS.log("Serving file: "+file_path, RNS.LOG_VERBOSE)
206              return [open(file_path, "rb"), {"name": file_name.encode("utf-8")}]
207  
208          except Exception as e:
209              RNS.log("Error occurred while handling request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_ERROR)
210              RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
211              return None
212  
213      def serve_default_index(self, path, data, request_id, remote_identity, requested_at):
214          RNS.log("Serving default index for request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_VERBOSE)
215          return DEFAULT_INDEX.encode("utf-8")
216  
217      def announce(self):
218          self.app_data = self.name.encode("utf-8")
219          self.last_announce = time.time()
220          self.app.peer_settings["node_last_announce"] = self.last_announce
221          self.destination.announce(app_data=self.app_data)
222          self.app.message_router.announce_propagation_node()
223  
224      def __jobs(self):
225          while self.should_run_jobs:
226              now = time.time()
227              
228              if now > self.last_announce + self.announce_interval*60:
229                  self.announce()
230                  
231              if self.page_refresh_interval > 0:
232                  if now > self.last_page_refresh + self.page_refresh_interval*60:
233                      self.register_pages()
234                      self.last_page_refresh = time.time()
235  
236              if self.file_refresh_interval > 0:
237                  if now > self.last_file_refresh + self.file_refresh_interval*60:
238                      self.register_files()
239                      self.last_file_refresh = time.time()
240  
241              time.sleep(self.job_interval)
242  
243      def peer_connected(self, link):
244          RNS.log("Peer connected to "+str(self.destination), RNS.LOG_VERBOSE)
245          try:
246              self.app.peer_settings["node_connects"] += 1
247              self.app.save_peer_settings()
248  
249          except Exception as e:
250              RNS.log("Could not increase node connection count", RNS.LOG_ERROR)
251  
252          link.set_link_closed_callback(self.peer_disconnected)
253  
254      def peer_disconnected(self, link):
255          RNS.log("Peer disconnected from "+str(self.destination), RNS.LOG_VERBOSE)
256          pass
257  
258  DEFAULT_INDEX = '''>Default Home Page
259  
260  This node is serving pages, but the home page file (index.mu) was not found in the page storage directory. This is an auto-generated placeholder.
261  
262  If you are the node operator, you can define your own home page by creating a file named `*index.mu`* in the page storage directory.
263  '''
264  
265  DEFAULT_NOTALLOWED = '''>Request Not Allowed
266  
267  You are not authorised to carry out the request.
268  '''