Node.py
1 import os 2 import sys 3 4 import RNS 5 import time 6 import threading 7 import subprocess 8 import RNS.vendor.umsgpack as msgpack 9 10 class Node: 11 JOB_INTERVAL = 5 12 START_ANNOUNCE_DELAY = 6 13 14 def __init__(self, app): 15 RNS.log("Nomad Network Node starting...", RNS.LOG_VERBOSE) 16 self.app = app 17 self.identity = self.app.identity 18 self.destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, "nomadnetwork", "node") 19 self.last_announce = time.time() 20 self.last_file_refresh = time.time() 21 self.last_page_refresh = time.time() 22 self.announce_interval = self.app.node_announce_interval 23 self.page_refresh_interval = self.app.page_refresh_interval 24 self.file_refresh_interval = self.app.file_refresh_interval 25 self.job_interval = Node.JOB_INTERVAL 26 self.should_run_jobs = True 27 self.app_data = None 28 self.name = self.app.node_name 29 30 self.register_pages() 31 self.register_files() 32 33 self.destination.set_link_established_callback(self.peer_connected) 34 35 if self.name == None: 36 self.name = self.app.peer_settings["display_name"]+"'s Node" 37 38 RNS.log("Node \""+self.name+"\" ready for incoming connections on "+RNS.prettyhexrep(self.destination.hash), RNS.LOG_VERBOSE) 39 40 if self.app.node_announce_at_start: 41 def delayed_announce(): 42 time.sleep(Node.START_ANNOUNCE_DELAY) 43 self.announce() 44 45 da_thread = threading.Thread(target=delayed_announce) 46 da_thread.setDaemon(True) 47 da_thread.start() 48 49 job_thread = threading.Thread(target=self.__jobs) 50 job_thread.setDaemon(True) 51 job_thread.start() 52 53 54 def register_pages(self): 55 # TODO: Deregister previously registered pages 56 # that no longer exist. 57 self.servedpages = [] 58 self.scan_pages(self.app.pagespath) 59 60 if not self.app.pagespath+"index.mu" in self.servedpages: 61 self.destination.register_request_handler( 62 "/page/index.mu", 63 response_generator = self.serve_default_index, 64 allow = RNS.Destination.ALLOW_ALL) 65 66 for page in self.servedpages: 67 request_path = "/page"+page.replace(self.app.pagespath, "") 68 self.destination.register_request_handler( 69 request_path, 70 response_generator = self.serve_page, 71 allow = RNS.Destination.ALLOW_ALL) 72 73 def register_files(self): 74 # TODO: Deregister previously registered files 75 # that no longer exist. 76 self.servedfiles = [] 77 self.scan_files(self.app.filespath) 78 79 for file in self.servedfiles: 80 request_path = "/file"+file.replace(self.app.filespath, "") 81 self.destination.register_request_handler( 82 request_path, 83 response_generator = self.serve_file, 84 allow = RNS.Destination.ALLOW_ALL, 85 auto_compress = 32_000_000) 86 87 def scan_pages(self, base_path): 88 files = [file for file in os.listdir(base_path) if os.path.isfile(os.path.join(base_path, file)) and file[:1] != "."] 89 directories = [file for file in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, file)) and file[:1] != "."] 90 91 for file in files: 92 if not file.endswith(".allowed"): 93 self.servedpages.append(base_path+"/"+file) 94 95 for directory in directories: 96 self.scan_pages(base_path+"/"+directory) 97 98 def scan_files(self, base_path): 99 files = [file for file in os.listdir(base_path) if os.path.isfile(os.path.join(base_path, file)) and file[:1] != "."] 100 directories = [file for file in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, file)) and file[:1] != "."] 101 102 for file in files: 103 self.servedfiles.append(base_path+"/"+file) 104 105 for directory in directories: 106 self.scan_files(base_path+"/"+directory) 107 108 def serve_page(self, path, data, request_id, link_id, remote_identity, requested_at): 109 RNS.log("Page request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_VERBOSE) 110 try: 111 self.app.peer_settings["served_page_requests"] += 1 112 self.app.save_peer_settings() 113 114 except Exception as e: 115 RNS.log("Could not increase served page request count", RNS.LOG_ERROR) 116 117 file_path = path.replace("/page", self.app.pagespath, 1) 118 119 allowed_path = file_path+".allowed" 120 request_allowed = False 121 122 if os.path.isfile(allowed_path): 123 allowed_list = [] 124 125 try: 126 if os.access(allowed_path, os.X_OK): 127 allowed_result = subprocess.run([allowed_path], stdout=subprocess.PIPE) 128 allowed_input = allowed_result.stdout 129 130 else: 131 fh = open(allowed_path, "rb") 132 allowed_input = fh.read() 133 fh.close() 134 135 allowed_hash_strs = allowed_input.splitlines() 136 137 for hash_str in allowed_hash_strs: 138 if len(hash_str) == RNS.Identity.TRUNCATED_HASHLENGTH//8*2: 139 try: 140 allowed_hash = bytes.fromhex(hash_str.decode("utf-8")) 141 allowed_list.append(allowed_hash) 142 143 except Exception as e: 144 RNS.log("Could not decode RNS Identity hash from: "+str(hash_str), RNS.LOG_DEBUG) 145 RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) 146 147 except Exception as e: 148 RNS.log("Error while fetching list of allowed identities for request: "+str(e), RNS.LOG_ERROR) 149 150 if hasattr(remote_identity, "hash") and remote_identity.hash in allowed_list: 151 request_allowed = True 152 else: 153 request_allowed = False 154 RNS.log("Denying request, remote identity was not in list of allowed identities", RNS.LOG_VERBOSE) 155 156 else: 157 request_allowed = True 158 159 try: 160 if request_allowed: 161 RNS.log("Serving page: "+file_path, RNS.LOG_VERBOSE) 162 if not RNS.vendor.platformutils.is_windows() and os.access(file_path, os.X_OK): 163 env_map = {} 164 if "PATH" in os.environ: 165 env_map["PATH"] = os.environ["PATH"] 166 if link_id != None: 167 env_map["link_id"] = RNS.hexrep(link_id, delimit=False) 168 if remote_identity != None: 169 env_map["remote_identity"] = RNS.hexrep(remote_identity.hash, delimit=False) 170 171 if data != None and isinstance(data, dict): 172 for e in data: 173 if isinstance(e, str) and (e.startswith("field_") or e.startswith("var_")): 174 env_map[e] = data[e] 175 176 generated = subprocess.run([file_path], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, env=env_map) 177 return generated.stdout 178 else: 179 fh = open(file_path, "rb") 180 response_data = fh.read() 181 fh.close() 182 return response_data 183 else: 184 RNS.log("Request denied", RNS.LOG_VERBOSE) 185 return DEFAULT_NOTALLOWED.encode("utf-8") 186 187 except Exception as e: 188 RNS.log("Error occurred while handling request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_ERROR) 189 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 190 return None 191 192 # TODO: Improve file handling, this will be slow for large files 193 def serve_file(self, path, data, request_id, remote_identity, requested_at): 194 RNS.log("File request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_VERBOSE) 195 try: 196 self.app.peer_settings["served_file_requests"] += 1 197 self.app.save_peer_settings() 198 199 except Exception as e: 200 RNS.log("Could not increase served file request count", RNS.LOG_ERROR) 201 202 file_path = path.replace("/file", self.app.filespath, 1) 203 file_name = path.replace("/file/", "", 1) 204 try: 205 RNS.log("Serving file: "+file_path, RNS.LOG_VERBOSE) 206 return [open(file_path, "rb"), {"name": file_name.encode("utf-8")}] 207 208 except Exception as e: 209 RNS.log("Error occurred while handling request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_ERROR) 210 RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) 211 return None 212 213 def serve_default_index(self, path, data, request_id, remote_identity, requested_at): 214 RNS.log("Serving default index for request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_VERBOSE) 215 return DEFAULT_INDEX.encode("utf-8") 216 217 def announce(self): 218 self.app_data = self.name.encode("utf-8") 219 self.last_announce = time.time() 220 self.app.peer_settings["node_last_announce"] = self.last_announce 221 self.destination.announce(app_data=self.app_data) 222 self.app.message_router.announce_propagation_node() 223 224 def __jobs(self): 225 while self.should_run_jobs: 226 now = time.time() 227 228 if now > self.last_announce + self.announce_interval*60: 229 self.announce() 230 231 if self.page_refresh_interval > 0: 232 if now > self.last_page_refresh + self.page_refresh_interval*60: 233 self.register_pages() 234 self.last_page_refresh = time.time() 235 236 if self.file_refresh_interval > 0: 237 if now > self.last_file_refresh + self.file_refresh_interval*60: 238 self.register_files() 239 self.last_file_refresh = time.time() 240 241 time.sleep(self.job_interval) 242 243 def peer_connected(self, link): 244 RNS.log("Peer connected to "+str(self.destination), RNS.LOG_VERBOSE) 245 try: 246 self.app.peer_settings["node_connects"] += 1 247 self.app.save_peer_settings() 248 249 except Exception as e: 250 RNS.log("Could not increase node connection count", RNS.LOG_ERROR) 251 252 link.set_link_closed_callback(self.peer_disconnected) 253 254 def peer_disconnected(self, link): 255 RNS.log("Peer disconnected from "+str(self.destination), RNS.LOG_VERBOSE) 256 pass 257 258 DEFAULT_INDEX = '''>Default Home Page 259 260 This node is serving pages, but the home page file (index.mu) was not found in the page storage directory. This is an auto-generated placeholder. 261 262 If you are the node operator, you can define your own home page by creating a file named `*index.mu`* in the page storage directory. 263 ''' 264 265 DEFAULT_NOTALLOWED = '''>Request Not Allowed 266 267 You are not authorised to carry out the request. 268 '''