service.py
1 import os 2 import time 3 from typing import Any, cast 4 5 from .command import command, CommandError 6 from .help import print_help 7 from .morphcoin import get_wallet_from_file 8 from ..context import DeviceContext, MainContext 9 from ..exceptions import ( 10 ServiceNotFoundError, 11 AttackNotRunningError, 12 AlreadyOwnThisServiceError, 13 WalletNotFoundError, 14 CannotDeleteEnforcedServiceError, 15 CannotToggleDirectlyError, 16 CouldNotStartServiceError, 17 ServiceNotRunningError, 18 ) 19 from ..models import Device, Service, PortscanService, BruteforceService, PublicService 20 from ..util import is_uuid 21 22 23 def get_service(context: DeviceContext, name: str) -> Service: 24 try: 25 return context.host.get_service_by_name(name) 26 except ServiceNotFoundError: 27 raise CommandError(f"The service '{name}' could not be found on this device") 28 29 30 def stop_bruteforce(context: DeviceContext, service: BruteforceService) -> None: 31 try: 32 access, _, target_device = service.stop() 33 except AttackNotRunningError: 34 raise CommandError("Bruteforce attack is not running.") 35 if access: 36 if context.ask("Access granted. Do you want to connect to the device? [yes|no] ", ["yes", "no"]) == "yes": 37 handle_remote_connect(context, [target_device]) 38 else: 39 print(f"To connect to the device type `remote connect {target_device}`") 40 else: 41 print("Access denied. The bruteforce attack was not successful") 42 43 44 @command("service", [DeviceContext]) 45 def handle_service(context: DeviceContext, args: list[str]) -> None: 46 """ 47 Create or use a service 48 """ 49 50 if args: 51 raise CommandError("Unknown subcommand.") 52 print_help(context, handle_service) 53 54 55 @handle_service.subcommand("create") 56 def handle_service_create(context: DeviceContext, args: list[str]) -> None: 57 """ 58 Create a new service 59 """ 60 61 if len(args) not in (1, 2) or args[0] not in ("bruteforce", "portscan", "telnet", "ssh", "miner"): 62 raise CommandError("usage: service create bruteforce|portscan|telnet|ssh|miner") 63 64 extra: dict[str, Any] = {} 65 if args[0] == "miner": 66 if len(args) != 2: 67 raise CommandError("usage: service create miner <wallet>") 68 69 try: 70 wallet_uuid: str = get_wallet_from_file(context, args[1]).uuid 71 except CommandError: 72 if is_uuid(args[1]): 73 wallet_uuid = args[1] 74 else: 75 raise CommandError("Invalid wallet uuid") 76 77 extra["wallet_uuid"] = wallet_uuid 78 79 try: 80 context.host.create_service(args[0], **extra) 81 print("Service has been created") 82 except AlreadyOwnThisServiceError: 83 raise CommandError("You already created this service") 84 except WalletNotFoundError: 85 raise CommandError("Wallet does not exist.") 86 87 88 @handle_service.subcommand("list") 89 def handle_service_list(context: DeviceContext, _: Any) -> None: 90 """ 91 List all services installed on this device 92 """ 93 94 services: list[Service] = context.host.get_services() 95 if not services: 96 print("There are no services on this device.") 97 else: 98 print("Services:") 99 for service in services: 100 line: str = f" - [{['stopped', 'running'][service.running]}] {service.name}" 101 if service.running_port is not None: 102 line += f" on port {service.running_port}" 103 print(line) 104 105 106 @handle_service.subcommand("delete") 107 def handle_service_delete(context: DeviceContext, args: list[str]) -> None: 108 """ 109 Delete a service 110 """ 111 112 if len(args) != 1 or args[0] not in ("bruteforce", "portscan", "telnet", "ssh", "miner"): 113 raise CommandError("usage: service delete bruteforce|portscan|telnet|ssh|miner") 114 115 service: Service = get_service(context, args[0]) 116 117 try: 118 service.delete() 119 except CannotDeleteEnforcedServiceError: 120 raise CommandError("The service could not be deleted.") 121 122 123 @handle_service.subcommand("start") 124 def handle_service_start(context: DeviceContext, args: list[str]) -> None: 125 """ 126 Start a service 127 """ 128 129 if len(args) != 1 or args[0] not in ("telnet", "ssh"): 130 raise CommandError("usage: service start telnet|ssh") 131 132 service: Service = get_service(context, args[0]) 133 if service.running: 134 raise CommandError("This service is already running.") 135 136 try: 137 service.toggle() 138 except (CannotToggleDirectlyError, CouldNotStartServiceError): 139 raise CommandError("The service could not be started.") 140 141 142 @handle_service.subcommand("stop") 143 def handle_service_stop(context: DeviceContext, args: list[str]) -> None: 144 """ 145 Stop a service 146 """ 147 148 if len(args) != 1 or args[0] not in ("telnet", "ssh"): 149 raise CommandError("usage: service stop telnet|ssh") 150 151 service: Service = get_service(context, args[0]) 152 if not service.running: 153 raise CommandError("This service is not running.") 154 155 try: 156 service.toggle() 157 except CannotToggleDirectlyError: 158 raise CommandError("The service could not be stopped.") 159 160 161 @handle_service.subcommand("portscan") 162 def handle_portscan(context: DeviceContext, args: list[str]) -> None: 163 """ 164 Perform a portscan 165 """ 166 167 if len(args) != 1: 168 raise CommandError("usage: service portscan <device>") 169 170 target: str = args[0] 171 if not is_uuid(target): 172 raise CommandError("Invalid target") 173 174 try: 175 service: PortscanService = PortscanService.get_portscan_service(context.client, context.host.uuid) 176 except ServiceNotFoundError: 177 raise CommandError("You have to create a portscan service before you can use it.") 178 179 services: list[PublicService] = service.scan(target) 180 context.last_portscan = target, services 181 if not services: 182 print("That device doesn't have any running services") 183 for s in services: 184 print(f" - {s.name} on port {s.running_port} (UUID: {s.uuid})") 185 186 187 @handle_service.subcommand("bruteforce") 188 def handle_bruteforce(context: DeviceContext, args: list[str]) -> None: 189 """ 190 Start a bruteforce attack 191 """ 192 193 duration_arg: str = "100%" 194 duration: int = 0 195 chance: float | None = None 196 if len(args) in (1, 2) and args[0] in ("ssh", "telnet"): 197 if context.last_portscan is None: 198 raise CommandError("You have to portscan your target first to find open ports.") 199 200 target_device, services = context.last_portscan 201 for service in services: 202 if service.name == args[0]: 203 target_service: str = service.uuid 204 break 205 else: 206 raise CommandError(f"Service '{args[0]}' is not running on target device.") 207 if len(args) == 2: 208 duration_arg = args[1] 209 elif len(args) in (2, 3): 210 target_device = args[0] 211 target_service = args[1] 212 if not is_uuid(target_device): 213 raise CommandError("Invalid target device") 214 if not is_uuid(target_service): 215 raise CommandError("Invalid target service") 216 217 if len(args) == 3: 218 duration_arg = args[2] 219 else: 220 raise CommandError( 221 "usage: service bruteforce <target-device> <target-service> [duration|success_chance]\n" 222 " service bruteforce ssh|telnet [duration|success_chance]" 223 ) 224 225 if duration_arg.endswith("%"): 226 error = "Success chance has to be a positive number between 0 and 100" 227 try: 228 chance = float(duration_arg[:-1]) / 100 229 except ValueError: 230 raise CommandError(error) 231 if chance < 0 or chance > 1: 232 raise CommandError(error) 233 else: 234 if not duration_arg.isnumeric(): 235 raise CommandError("Duration has to be a positive integer") 236 duration = int(duration_arg) 237 238 try: 239 bruteforce_service: BruteforceService = BruteforceService.get_bruteforce_service( 240 context.client, context.host.uuid 241 ) 242 except ServiceNotFoundError: 243 raise CommandError("You have to create a bruteforce service before you can use it.") 244 245 if bruteforce_service.running: 246 print("You are already attacking a device.") 247 print(f"Target device: {bruteforce_service.target_device_uuid}") 248 if context.ask("Do you want to stop this attack? [yes|no] ", ["yes", "no"]) == "yes": 249 stop_bruteforce(context, bruteforce_service) 250 return 251 252 try: 253 bruteforce_service.attack(target_device, target_service) 254 if chance is not None: 255 bruteforce_service.update() 256 duration = round((chance + 0.1) * 20 / bruteforce_service.speed) 257 except ServiceNotFoundError: 258 raise CommandError("The target service does not exist.") 259 except ServiceNotRunningError: 260 raise CommandError("The target service is not running and cannot be exploited.") 261 262 print("You started a bruteforce attack") 263 width = os.get_terminal_size().columns - 31 264 steps = 17 265 d = duration * steps 266 i = 0 267 last_check: float = 0 268 try: 269 context.update_presence( 270 state=f"Logged in: {context.username}@{context.root_context.host}", 271 details="Hacking Remote Device", 272 end=int(time.time()) + duration, 273 large_image="cryptic", 274 large_text="Cryptic", 275 ) 276 for i in range(d): 277 if time.time() - last_check > 1: 278 last_check = time.time() 279 bruteforce_service.update() 280 if not bruteforce_service.running: 281 print("\rBruteforce attack has been aborted.") 282 return 283 284 progress: int = int(i / d * width) 285 j = i // steps 286 progress_bar = "[" + "=" * progress + ">" + " " * (width - progress) + "]" 287 text = f"\rBruteforcing {j // 60:02d}:{j % 60:02d} {progress_bar} ({i / d * 100:.1f}%) " 288 print(end=text, flush=True) 289 time.sleep(1 / steps) 290 i = (i + 1) // steps 291 print(f"\rBruteforcing {i // 60:02d}:{i % 60:02d} [" + "=" * width + ">] (100%) ") 292 except KeyboardInterrupt: 293 print() 294 context.main_loop_presence() 295 stop_bruteforce(context, bruteforce_service) 296 297 298 @handle_service_create.completer() 299 def service_create_completer(context: DeviceContext, args: list[str]) -> list[str]: 300 if len(args) == 1: 301 return ["bruteforce", "portscan", "ssh", "telnet", "miner"] 302 if len(args) == 2 and args[0] == "miner": 303 return context.file_path_completer(args[1]) 304 return [] 305 306 307 @handle_service_delete.completer() 308 def service_delete_completer(_: Any, args: list[str]) -> list[str]: 309 if len(args) == 1: 310 return ["bruteforce", "portscan", "telnet", "miner"] 311 return [] 312 313 314 @handle_service_start.completer() 315 @handle_service_stop.completer() 316 @handle_bruteforce.completer() 317 def service_completer(_: Any, args: list[str]) -> list[str]: 318 if len(args) == 1: 319 return ["ssh", "telnet"] 320 return [] 321 322 323 @command("spot", [DeviceContext]) 324 def handle_spot(context: DeviceContext, _: Any) -> None: 325 """ 326 Find a random device in the network 327 """ 328 329 device: Device = Device.spot(context.client) 330 print(f"Name: '{device.name}'" + " [hacked]" * device.part_owner()) 331 print(f"UUID: {device.uuid}") 332 handle_portscan(context, [device.uuid]) 333 334 335 @command("remote", [MainContext, DeviceContext]) 336 def handle_remote(context: MainContext, args: list[str]) -> None: 337 """ 338 Manage and connect to the devices you hacked before 339 """ 340 341 if args: 342 raise CommandError("Unknown subcommand.") 343 print_help(context, handle_remote) 344 345 346 @handle_remote.subcommand("list") 347 def handle_remote_list(context: MainContext, _: Any) -> None: 348 """ 349 List remote devices 350 """ 351 352 devices: list[Device] = context.get_hacked_devices() 353 354 if not devices: 355 print("You don't have access to any remote device.") 356 else: 357 print("Remote devices:") 358 for device in devices: 359 print(f" - [{['off', 'on'][device.powered_on]}] {device.name} (UUID: {device.uuid})") 360 361 362 @handle_remote.subcommand("connect") 363 def handle_remote_connect(context: MainContext, args: list[str]) -> None: 364 """ 365 Connect to a remote device 366 """ 367 368 if len(args) != 1: 369 print("usage: remote connect <name|uuid>") 370 return 371 372 name: str = args[0] 373 if is_uuid(name): 374 device: Device = Device.get_device(context.client, name) 375 if device is None: 376 raise CommandError("This device does not exist or you have no permission to access it.") 377 else: 378 found_devices: list[Device] = [] 379 for device in context.get_hacked_devices(): 380 if device.name == name: 381 found_devices.append(device) 382 383 if not found_devices: 384 raise CommandError(f"There is no device with the name '{name}'.") 385 if len(found_devices) > 1: 386 raise CommandError(f"There is more than one device with the name '{name}'. You need to specify its UUID.") 387 388 device = found_devices[0] 389 390 print(f"Connecting to {device.name} (UUID: {device.uuid})") 391 if device.part_owner(): 392 context.open(DeviceContext(context.root_context, cast(str, context.session_token), device)) 393 else: 394 raise CommandError("This device does not exist or you have no permission to access it.") 395 396 397 @handle_remote_connect.completer() 398 def remote_completer(context: MainContext, args: list[str]) -> list[str]: 399 if len(args) == 1: 400 device_names: list[str] = [device.name for device in context.get_hacked_devices()] 401 return [name for name in device_names if device_names.count(name) == 1] 402 return []