/ PyCrypCli / PyCrypCli / commands / service.py
service.py
  1  import os
  2  import time
  3  from typing import Any, cast
  4  
  5  from .command import command, CommandError
  6  from .help import print_help
  7  from .morphcoin import get_wallet_from_file
  8  from ..context import DeviceContext, MainContext
  9  from ..exceptions import (
 10      ServiceNotFoundError,
 11      AttackNotRunningError,
 12      AlreadyOwnThisServiceError,
 13      WalletNotFoundError,
 14      CannotDeleteEnforcedServiceError,
 15      CannotToggleDirectlyError,
 16      CouldNotStartServiceError,
 17      ServiceNotRunningError,
 18  )
 19  from ..models import Device, Service, PortscanService, BruteforceService, PublicService
 20  from ..util import is_uuid
 21  
 22  
 23  def get_service(context: DeviceContext, name: str) -> Service:
 24      try:
 25          return context.host.get_service_by_name(name)
 26      except ServiceNotFoundError:
 27          raise CommandError(f"The service '{name}' could not be found on this device")
 28  
 29  
 30  def stop_bruteforce(context: DeviceContext, service: BruteforceService) -> None:
 31      try:
 32          access, _, target_device = service.stop()
 33      except AttackNotRunningError:
 34          raise CommandError("Bruteforce attack is not running.")
 35      if access:
 36          if context.ask("Access granted. Do you want to connect to the device? [yes|no] ", ["yes", "no"]) == "yes":
 37              handle_remote_connect(context, [target_device])
 38          else:
 39              print(f"To connect to the device type `remote connect {target_device}`")
 40      else:
 41          print("Access denied. The bruteforce attack was not successful")
 42  
 43  
 44  @command("service", [DeviceContext])
 45  def handle_service(context: DeviceContext, args: list[str]) -> None:
 46      """
 47      Create or use a service
 48      """
 49  
 50      if args:
 51          raise CommandError("Unknown subcommand.")
 52      print_help(context, handle_service)
 53  
 54  
 55  @handle_service.subcommand("create")
 56  def handle_service_create(context: DeviceContext, args: list[str]) -> None:
 57      """
 58      Create a new service
 59      """
 60  
 61      if len(args) not in (1, 2) or args[0] not in ("bruteforce", "portscan", "telnet", "ssh", "miner"):
 62          raise CommandError("usage: service create bruteforce|portscan|telnet|ssh|miner")
 63  
 64      extra: dict[str, Any] = {}
 65      if args[0] == "miner":
 66          if len(args) != 2:
 67              raise CommandError("usage: service create miner <wallet>")
 68  
 69          try:
 70              wallet_uuid: str = get_wallet_from_file(context, args[1]).uuid
 71          except CommandError:
 72              if is_uuid(args[1]):
 73                  wallet_uuid = args[1]
 74              else:
 75                  raise CommandError("Invalid wallet uuid")
 76  
 77          extra["wallet_uuid"] = wallet_uuid
 78  
 79      try:
 80          context.host.create_service(args[0], **extra)
 81          print("Service has been created")
 82      except AlreadyOwnThisServiceError:
 83          raise CommandError("You already created this service")
 84      except WalletNotFoundError:
 85          raise CommandError("Wallet does not exist.")
 86  
 87  
 88  @handle_service.subcommand("list")
 89  def handle_service_list(context: DeviceContext, _: Any) -> None:
 90      """
 91      List all services installed on this device
 92      """
 93  
 94      services: list[Service] = context.host.get_services()
 95      if not services:
 96          print("There are no services on this device.")
 97      else:
 98          print("Services:")
 99      for service in services:
100          line: str = f" - [{['stopped', 'running'][service.running]}] {service.name}"
101          if service.running_port is not None:
102              line += f" on port {service.running_port}"
103          print(line)
104  
105  
106  @handle_service.subcommand("delete")
107  def handle_service_delete(context: DeviceContext, args: list[str]) -> None:
108      """
109      Delete a service
110      """
111  
112      if len(args) != 1 or args[0] not in ("bruteforce", "portscan", "telnet", "ssh", "miner"):
113          raise CommandError("usage: service delete bruteforce|portscan|telnet|ssh|miner")
114  
115      service: Service = get_service(context, args[0])
116  
117      try:
118          service.delete()
119      except CannotDeleteEnforcedServiceError:
120          raise CommandError("The service could not be deleted.")
121  
122  
123  @handle_service.subcommand("start")
124  def handle_service_start(context: DeviceContext, args: list[str]) -> None:
125      """
126      Start a service
127      """
128  
129      if len(args) != 1 or args[0] not in ("telnet", "ssh"):
130          raise CommandError("usage: service start telnet|ssh")
131  
132      service: Service = get_service(context, args[0])
133      if service.running:
134          raise CommandError("This service is already running.")
135  
136      try:
137          service.toggle()
138      except (CannotToggleDirectlyError, CouldNotStartServiceError):
139          raise CommandError("The service could not be started.")
140  
141  
142  @handle_service.subcommand("stop")
143  def handle_service_stop(context: DeviceContext, args: list[str]) -> None:
144      """
145      Stop a service
146      """
147  
148      if len(args) != 1 or args[0] not in ("telnet", "ssh"):
149          raise CommandError("usage: service stop telnet|ssh")
150  
151      service: Service = get_service(context, args[0])
152      if not service.running:
153          raise CommandError("This service is not running.")
154  
155      try:
156          service.toggle()
157      except CannotToggleDirectlyError:
158          raise CommandError("The service could not be stopped.")
159  
160  
161  @handle_service.subcommand("portscan")
162  def handle_portscan(context: DeviceContext, args: list[str]) -> None:
163      """
164      Perform a portscan
165      """
166  
167      if len(args) != 1:
168          raise CommandError("usage: service portscan <device>")
169  
170      target: str = args[0]
171      if not is_uuid(target):
172          raise CommandError("Invalid target")
173  
174      try:
175          service: PortscanService = PortscanService.get_portscan_service(context.client, context.host.uuid)
176      except ServiceNotFoundError:
177          raise CommandError("You have to create a portscan service before you can use it.")
178  
179      services: list[PublicService] = service.scan(target)
180      context.last_portscan = target, services
181      if not services:
182          print("That device doesn't have any running services")
183      for s in services:
184          print(f" - {s.name} on port {s.running_port} (UUID: {s.uuid})")
185  
186  
187  @handle_service.subcommand("bruteforce")
188  def handle_bruteforce(context: DeviceContext, args: list[str]) -> None:
189      """
190      Start a bruteforce attack
191      """
192  
193      duration_arg: str = "100%"
194      duration: int = 0
195      chance: float | None = None
196      if len(args) in (1, 2) and args[0] in ("ssh", "telnet"):
197          if context.last_portscan is None:
198              raise CommandError("You have to portscan your target first to find open ports.")
199  
200          target_device, services = context.last_portscan
201          for service in services:
202              if service.name == args[0]:
203                  target_service: str = service.uuid
204                  break
205          else:
206              raise CommandError(f"Service '{args[0]}' is not running on target device.")
207          if len(args) == 2:
208              duration_arg = args[1]
209      elif len(args) in (2, 3):
210          target_device = args[0]
211          target_service = args[1]
212          if not is_uuid(target_device):
213              raise CommandError("Invalid target device")
214          if not is_uuid(target_service):
215              raise CommandError("Invalid target service")
216  
217          if len(args) == 3:
218              duration_arg = args[2]
219      else:
220          raise CommandError(
221              "usage: service bruteforce <target-device> <target-service> [duration|success_chance]\n"
222              "       service bruteforce ssh|telnet [duration|success_chance]"
223          )
224  
225      if duration_arg.endswith("%"):
226          error = "Success chance has to be a positive number between 0 and 100"
227          try:
228              chance = float(duration_arg[:-1]) / 100
229          except ValueError:
230              raise CommandError(error)
231          if chance < 0 or chance > 1:
232              raise CommandError(error)
233      else:
234          if not duration_arg.isnumeric():
235              raise CommandError("Duration has to be a positive integer")
236          duration = int(duration_arg)
237  
238      try:
239          bruteforce_service: BruteforceService = BruteforceService.get_bruteforce_service(
240              context.client, context.host.uuid
241          )
242      except ServiceNotFoundError:
243          raise CommandError("You have to create a bruteforce service before you can use it.")
244  
245      if bruteforce_service.running:
246          print("You are already attacking a device.")
247          print(f"Target device: {bruteforce_service.target_device_uuid}")
248          if context.ask("Do you want to stop this attack? [yes|no] ", ["yes", "no"]) == "yes":
249              stop_bruteforce(context, bruteforce_service)
250          return
251  
252      try:
253          bruteforce_service.attack(target_device, target_service)
254          if chance is not None:
255              bruteforce_service.update()
256              duration = round((chance + 0.1) * 20 / bruteforce_service.speed)
257      except ServiceNotFoundError:
258          raise CommandError("The target service does not exist.")
259      except ServiceNotRunningError:
260          raise CommandError("The target service is not running and cannot be exploited.")
261  
262      print("You started a bruteforce attack")
263      width = os.get_terminal_size().columns - 31
264      steps = 17
265      d = duration * steps
266      i = 0
267      last_check: float = 0
268      try:
269          context.update_presence(
270              state=f"Logged in: {context.username}@{context.root_context.host}",
271              details="Hacking Remote Device",
272              end=int(time.time()) + duration,
273              large_image="cryptic",
274              large_text="Cryptic",
275          )
276          for i in range(d):
277              if time.time() - last_check > 1:
278                  last_check = time.time()
279                  bruteforce_service.update()
280                  if not bruteforce_service.running:
281                      print("\rBruteforce attack has been aborted.")
282                      return
283  
284              progress: int = int(i / d * width)
285              j = i // steps
286              progress_bar = "[" + "=" * progress + ">" + " " * (width - progress) + "]"
287              text = f"\rBruteforcing {j // 60:02d}:{j % 60:02d} {progress_bar} ({i / d * 100:.1f}%) "
288              print(end=text, flush=True)
289              time.sleep(1 / steps)
290          i = (i + 1) // steps
291          print(f"\rBruteforcing {i // 60:02d}:{i % 60:02d} [" + "=" * width + ">] (100%) ")
292      except KeyboardInterrupt:
293          print()
294      context.main_loop_presence()
295      stop_bruteforce(context, bruteforce_service)
296  
297  
298  @handle_service_create.completer()
299  def service_create_completer(context: DeviceContext, args: list[str]) -> list[str]:
300      if len(args) == 1:
301          return ["bruteforce", "portscan", "ssh", "telnet", "miner"]
302      if len(args) == 2 and args[0] == "miner":
303          return context.file_path_completer(args[1])
304      return []
305  
306  
307  @handle_service_delete.completer()
308  def service_delete_completer(_: Any, args: list[str]) -> list[str]:
309      if len(args) == 1:
310          return ["bruteforce", "portscan", "telnet", "miner"]
311      return []
312  
313  
314  @handle_service_start.completer()
315  @handle_service_stop.completer()
316  @handle_bruteforce.completer()
317  def service_completer(_: Any, args: list[str]) -> list[str]:
318      if len(args) == 1:
319          return ["ssh", "telnet"]
320      return []
321  
322  
323  @command("spot", [DeviceContext])
324  def handle_spot(context: DeviceContext, _: Any) -> None:
325      """
326      Find a random device in the network
327      """
328  
329      device: Device = Device.spot(context.client)
330      print(f"Name: '{device.name}'" + " [hacked]" * device.part_owner())
331      print(f"UUID: {device.uuid}")
332      handle_portscan(context, [device.uuid])
333  
334  
335  @command("remote", [MainContext, DeviceContext])
336  def handle_remote(context: MainContext, args: list[str]) -> None:
337      """
338      Manage and connect to the devices you hacked before
339      """
340  
341      if args:
342          raise CommandError("Unknown subcommand.")
343      print_help(context, handle_remote)
344  
345  
346  @handle_remote.subcommand("list")
347  def handle_remote_list(context: MainContext, _: Any) -> None:
348      """
349      List remote devices
350      """
351  
352      devices: list[Device] = context.get_hacked_devices()
353  
354      if not devices:
355          print("You don't have access to any remote device.")
356      else:
357          print("Remote devices:")
358      for device in devices:
359          print(f" - [{['off', 'on'][device.powered_on]}] {device.name} (UUID: {device.uuid})")
360  
361  
362  @handle_remote.subcommand("connect")
363  def handle_remote_connect(context: MainContext, args: list[str]) -> None:
364      """
365      Connect to a remote device
366      """
367  
368      if len(args) != 1:
369          print("usage: remote connect <name|uuid>")
370          return
371  
372      name: str = args[0]
373      if is_uuid(name):
374          device: Device = Device.get_device(context.client, name)
375          if device is None:
376              raise CommandError("This device does not exist or you have no permission to access it.")
377      else:
378          found_devices: list[Device] = []
379          for device in context.get_hacked_devices():
380              if device.name == name:
381                  found_devices.append(device)
382  
383          if not found_devices:
384              raise CommandError(f"There is no device with the name '{name}'.")
385          if len(found_devices) > 1:
386              raise CommandError(f"There is more than one device with the name '{name}'. You need to specify its UUID.")
387  
388          device = found_devices[0]
389  
390      print(f"Connecting to {device.name} (UUID: {device.uuid})")
391      if device.part_owner():
392          context.open(DeviceContext(context.root_context, cast(str, context.session_token), device))
393      else:
394          raise CommandError("This device does not exist or you have no permission to access it.")
395  
396  
397  @handle_remote_connect.completer()
398  def remote_completer(context: MainContext, args: list[str]) -> list[str]:
399      if len(args) == 1:
400          device_names: list[str] = [device.name for device in context.get_hacked_devices()]
401          return [name for name in device_names if device_names.count(name) == 1]
402      return []