/ PyCrypCli / PyCrypCli / commands / network.py
network.py
  1  from typing import Any
  2  
  3  from .command import command, CommandError
  4  from .device import get_device
  5  from .help import print_help
  6  from ..context import DeviceContext
  7  from ..exceptions import (
  8      MaximumNetworksReachedError,
  9      InvalidNameError,
 10      NameAlreadyInUseError,
 11      NetworkNotFoundError,
 12      AlreadyMemberOfNetworkError,
 13      InvitationAlreadyExistsError,
 14      NoPermissionsError,
 15      CannotLeaveOwnNetworkError,
 16      CannotKickOwnerError,
 17      DeviceNotFoundError,
 18  )
 19  from ..models import Network, NetworkMembership, Device, NetworkInvitation
 20  from ..util import is_uuid
 21  
 22  
 23  def get_network(context: DeviceContext, name_or_uuid: str) -> Network:
 24      if is_uuid(name_or_uuid):
 25          try:
 26              return Network.get_by_uuid(context.client, name_or_uuid)
 27          except NetworkNotFoundError:
 28              pass
 29  
 30      try:
 31          return Network.get_network_by_name(context.client, name_or_uuid)
 32      except NetworkNotFoundError:
 33          raise CommandError("This network does not exist.")
 34  
 35  
 36  def handle_membership_request(context: DeviceContext, args: list[str], accept: bool) -> None:
 37      if len(args) not in (1, 2):
 38          raise CommandError(f"usage: network {['deny', 'accept'][accept]} <network> [<device>]")
 39  
 40      network: Network = get_network(context, args[0])
 41  
 42      if len(args) == 1:
 43          for invitation in context.host.get_network_invitations():
 44              if invitation.network_uuid == network.uuid:
 45                  break
 46          else:
 47              raise CommandError("Invitation not found.")
 48      else:
 49          devices: list[Device] = []
 50          for request in network.get_membership_requests():
 51              try:
 52                  devices.append(Device.get_device(context.client, request.device_uuid))
 53              except DeviceNotFoundError:
 54                  pass
 55  
 56          device: Device = get_device(context, args[1], devices)
 57          try:
 58              for invitation in network.get_membership_requests():
 59                  if invitation.network_uuid == network.uuid and invitation.device_uuid == device.uuid:
 60                      break
 61              else:
 62                  raise CommandError("Join request not found.")
 63          except NoPermissionsError:
 64              raise CommandError("Permission denied.")
 65  
 66      if accept:
 67          invitation.accept()
 68      else:
 69          invitation.deny()
 70  
 71  
 72  @command("network", [DeviceContext])
 73  def handle_network(context: DeviceContext, args: list[str]) -> None:
 74      """
 75      Manage your networks
 76      """
 77  
 78      if args:
 79          raise CommandError("Unknown subcommand.")
 80      print_help(context, handle_network)
 81  
 82  
 83  @handle_network.subcommand("list")
 84  def handle_network_list(context: DeviceContext, _: Any) -> None:
 85      """
 86      View the networks this device is a member of
 87      """
 88  
 89      networks: list[Network] = context.host.get_networks()
 90  
 91      if not networks:
 92          print("This device is not a member of any network.")
 93      else:
 94          print("Networks this device is a member of:")
 95      for network in networks:
 96          owner: str = " (owner)" * (network.owner_uuid == context.host.uuid)
 97          print(f" - [{['public', 'private'][network.hidden]}] {network.name}{owner} (UUID: {network.uuid})")
 98  
 99  
100  @handle_network.subcommand("public")
101  def handle_network_public(context: DeviceContext, _: Any) -> None:
102      """
103      View public networks
104      """
105  
106      networks: list[Network] = Network.get_public_networks(context.client)
107  
108      if not networks:
109          print("There is no public network.")
110      else:
111          print("Public networks:")
112      for network in networks:
113          owner: str = " (owner)" * (network.owner_uuid == context.host.uuid)
114          print(f" - {network.name}{owner} (UUID: {network.uuid})")
115  
116  
117  @handle_network.subcommand("create")
118  def handle_network_create(context: DeviceContext, args: list[str]) -> None:
119      """
120      Create a new network
121      """
122  
123      if len(args) != 2 or args[1] not in ("public", "private"):
124          raise CommandError("usage: network create <name> public|private")
125  
126      try:
127          context.host.create_network(args[0], args[1] == "private")
128      except MaximumNetworksReachedError:
129          raise CommandError("You already own two networks.")
130      except InvalidNameError:
131          raise CommandError("Invalid name.")
132      except NameAlreadyInUseError:
133          raise CommandError("This name is already in use.")
134  
135  
136  @handle_network.subcommand("members")
137  def handle_network_members(context: DeviceContext, args: list[str]) -> None:
138      """
139      View the members of one of your networks
140      """
141  
142      if len(args) != 1:
143          raise CommandError("usage: network members <network>")
144  
145      network: Network = get_network(context, args[0])
146  
147      try:
148          members: list[NetworkMembership] = network.get_members()
149      except NetworkNotFoundError:
150          raise CommandError("Permission denied.")
151  
152      if not members:
153          print("This network has no members.")
154      else:
155          print(f"Members of '{network.name}':")
156      for member in members:
157          device: Device = Device.get_device(context.client, member.device_uuid)
158          print(f" - [{['off', 'on'][device.powered_on]}] {device.name} (UUID: {device.uuid})")
159  
160  
161  @handle_network.subcommand("request")
162  def handle_network_request(context: DeviceContext, args: list[str]) -> None:
163      """
164      Request membership of a network
165      """
166  
167      if len(args) != 1:
168          raise CommandError("usage: network request <network>")
169  
170      network: Network = get_network(context, args[0])
171  
172      try:
173          network.request_membership(context.host)
174      except AlreadyMemberOfNetworkError:
175          raise CommandError("This device is already a member of the network.")
176      except InvitationAlreadyExistsError:
177          raise CommandError("You already requested to join this network.")
178  
179  
180  @handle_network.subcommand("requests")
181  def handle_network_requests(context: DeviceContext, args: list[str]) -> None:
182      """
183      View open membership requests of one of your networks
184      """
185  
186      if len(args) != 1:
187          raise CommandError("usage: network requests <network>")
188  
189      network: Network = get_network(context, args[0])
190  
191      try:
192          requests: list[NetworkInvitation] = network.get_membership_requests()
193      except NoPermissionsError:
194          raise CommandError("Permission denied.")
195  
196      if not requests:
197          print("There are no pending requests for this network.")
198      else:
199          print("Pending requests:")
200      for request in requests:
201          device: Device = Device.get_device(context.client, request.device_uuid)
202          print(f" - {device.name} (UUID: {device.uuid})")
203  
204  
205  @handle_network.subcommand("accept")
206  def handle_network_accept(context: DeviceContext, args: list[str]) -> None:
207      """
208      Accept a membership request or an invitation
209      """
210  
211      handle_membership_request(context, args, True)
212  
213  
214  @handle_network.subcommand("deny")
215  def handle_network_deny(context: DeviceContext, args: list[str]) -> None:
216      """
217      Deny a membership request or an invitation
218      """
219  
220      handle_membership_request(context, args, False)
221  
222  
223  @handle_network.subcommand("invite")
224  def handle_network_invite(context: DeviceContext, args: list[str]) -> None:
225      """
226      Invite a device to one of your networks
227      """
228  
229      if len(args) != 2:
230          raise CommandError("usage: network invite <network> <device>")
231  
232      network: Network = get_network(context, args[0])
233  
234      device: Device = get_device(context, args[1])
235      try:
236          network.invite_device(device)
237      except NetworkNotFoundError:
238          raise CommandError("Permission denied.")
239      except AlreadyMemberOfNetworkError:
240          raise CommandError("Device is already a member of this network.")
241      except InvitationAlreadyExistsError:
242          raise CommandError("An invitation for this device already exists.")
243  
244  
245  @handle_network.subcommand("invitations")
246  def handle_network_invitations(context: DeviceContext, _: Any) -> None:
247      """
248      View invitations for this device to other networks
249      """
250  
251      invitations: list[NetworkInvitation] = context.host.get_network_invitations()
252      if not invitations:
253          print("There are no pending network invitations for this device.")
254      else:
255          print("Pending network invitations:")
256      for invitation in invitations:
257          network: Network = Network.get_by_uuid(context.client, invitation.network_uuid)
258          owner: str = " (owner)" * (network.owner_uuid == context.host.uuid)
259          print(f" - [{['public', 'private'][network.hidden]}] {network.name}{owner} (UUID: {network.uuid})")
260  
261  
262  @handle_network.subcommand("leave")
263  def handle_network_leave(context: DeviceContext, args: list[str]) -> None:
264      """
265      Leave a network
266      """
267  
268      if len(args) != 1:
269          raise CommandError("usage: network leave <network>")
270  
271      network: Network = get_network(context, args[0])
272  
273      try:
274          network.leave(context.host)
275      except CannotLeaveOwnNetworkError:
276          raise CommandError("You cannot leave your own network.")
277  
278  
279  @handle_network.subcommand("kick")
280  def handle_network_kick(context: DeviceContext, args: list[str]) -> None:
281      """
282      Kick a device from one of your networks
283      """
284  
285      if len(args) != 2:
286          raise CommandError("usage: network kick <network> <device>")
287  
288      network: Network = get_network(context, args[0])
289  
290      devices: list[Device] = []
291      for member in network.get_members():
292          devices.append(Device.get_device(context.client, member.device_uuid))
293  
294      device: Device = get_device(context, args[1], devices)
295      try:
296          network.kick(device)
297      except NetworkNotFoundError:
298          raise CommandError("Permission denied.")
299      except NoPermissionsError:
300          raise CommandError("Permission denied.")
301      except CannotKickOwnerError:
302          raise CommandError("You cannot kick the owner of the network.")
303  
304  
305  @handle_network.subcommand("delete")
306  def handle_network_delete(context: DeviceContext, args: list[str]) -> None:
307      """
308      Delete one of your networks
309      """
310  
311      if len(args) != 1:
312          raise CommandError("usage: network delete <network>")
313  
314      network: Network = get_network(context, args[0])
315  
316      try:
317          network.delete()
318      except NetworkNotFoundError:
319          raise CommandError("Permission denied.")
320  
321  
322  def device_network_names(context: DeviceContext) -> list[str]:
323      return [network.name for network in context.host.get_networks()]
324  
325  
326  def public_network_names(context: DeviceContext) -> list[str]:
327      return [network.name for network in Network.get_public_networks(context.client)]
328  
329  
330  def invitation_network_names(context: DeviceContext) -> list[str]:
331      return [
332          Network.get_by_uuid(context.client, invitation.network_uuid).name
333          for invitation in context.host.get_network_invitations()
334      ]
335  
336  
337  @handle_network_members.completer()
338  @handle_network_request.completer()
339  @handle_network_requests.completer()
340  @handle_network_leave.completer()
341  @handle_network_delete.completer()
342  def network_completer(context: DeviceContext, args: list[str]) -> list[str]:
343      if len(args) == 1:
344          return [*{*device_network_names(context), *public_network_names(context), *invitation_network_names(context)}]
345      return []
346  
347  
348  @handle_network_create.completer()
349  def network_create_completer(_: Any, args: list[str]) -> list[str]:
350      if len(args) == 2:
351          return ["public", "private"]
352      return []
353  
354  
355  @handle_network_accept.completer()
356  @handle_network_deny.completer()
357  def network_accept_deny_completer(context: DeviceContext, args: list[str]) -> list[str]:
358      if len(args) == 1:
359          return [*{*device_network_names(context), *invitation_network_names(context)}]
360      if len(args) == 2:
361          try:
362              network: Network = get_network(context, args[0])
363          except CommandError:
364              return []
365          device_names: list[str] = []
366          for request in network.get_membership_requests():
367              try:
368                  device_names.append(Device.get_device(context.client, request.device_uuid).name)
369              except DeviceNotFoundError:
370                  pass
371          return [name for name in device_names if device_names.count(name) == 1]
372      return []
373  
374  
375  @handle_network_invite.completer()
376  def network_invite_completer(context: DeviceContext, args: list[str]) -> list[str]:
377      if len(args) == 1:
378          return [*{*device_network_names(context)}]
379      if len(args) == 2:
380          device_names: list[str] = [device.name for device in Device.list_devices(context.client)]
381          return [name for name in device_names if device_names.count(name) == 1]
382      return []
383  
384  
385  @handle_network_kick.completer()
386  def network_kick_completer(context: DeviceContext, args: list[str]) -> list[str]:
387      if len(args) == 1:
388          return [*{*device_network_names(context)}]
389      if len(args) == 2:
390          try:
391              network: Network = get_network(context, args[0])
392          except CommandError:
393              return []
394          device_names: list[str] = []
395          for member in network.get_members():
396              device_names.append(Device.get_device(context.client, member.device_uuid).name)
397          return [name for name in device_names if device_names.count(name) == 1]
398      return []