network.py
1 from typing import Any 2 3 from .command import command, CommandError 4 from .device import get_device 5 from .help import print_help 6 from ..context import DeviceContext 7 from ..exceptions import ( 8 MaximumNetworksReachedError, 9 InvalidNameError, 10 NameAlreadyInUseError, 11 NetworkNotFoundError, 12 AlreadyMemberOfNetworkError, 13 InvitationAlreadyExistsError, 14 NoPermissionsError, 15 CannotLeaveOwnNetworkError, 16 CannotKickOwnerError, 17 DeviceNotFoundError, 18 ) 19 from ..models import Network, NetworkMembership, Device, NetworkInvitation 20 from ..util import is_uuid 21 22 23 def get_network(context: DeviceContext, name_or_uuid: str) -> Network: 24 if is_uuid(name_or_uuid): 25 try: 26 return Network.get_by_uuid(context.client, name_or_uuid) 27 except NetworkNotFoundError: 28 pass 29 30 try: 31 return Network.get_network_by_name(context.client, name_or_uuid) 32 except NetworkNotFoundError: 33 raise CommandError("This network does not exist.") 34 35 36 def handle_membership_request(context: DeviceContext, args: list[str], accept: bool) -> None: 37 if len(args) not in (1, 2): 38 raise CommandError(f"usage: network {['deny', 'accept'][accept]} <network> [<device>]") 39 40 network: Network = get_network(context, args[0]) 41 42 if len(args) == 1: 43 for invitation in context.host.get_network_invitations(): 44 if invitation.network_uuid == network.uuid: 45 break 46 else: 47 raise CommandError("Invitation not found.") 48 else: 49 devices: list[Device] = [] 50 for request in network.get_membership_requests(): 51 try: 52 devices.append(Device.get_device(context.client, request.device_uuid)) 53 except DeviceNotFoundError: 54 pass 55 56 device: Device = get_device(context, args[1], devices) 57 try: 58 for invitation in network.get_membership_requests(): 59 if invitation.network_uuid == network.uuid and invitation.device_uuid == device.uuid: 60 break 61 else: 62 raise CommandError("Join request not found.") 63 except NoPermissionsError: 64 raise CommandError("Permission denied.") 65 66 if accept: 67 invitation.accept() 68 else: 69 invitation.deny() 70 71 72 @command("network", [DeviceContext]) 73 def handle_network(context: DeviceContext, args: list[str]) -> None: 74 """ 75 Manage your networks 76 """ 77 78 if args: 79 raise CommandError("Unknown subcommand.") 80 print_help(context, handle_network) 81 82 83 @handle_network.subcommand("list") 84 def handle_network_list(context: DeviceContext, _: Any) -> None: 85 """ 86 View the networks this device is a member of 87 """ 88 89 networks: list[Network] = context.host.get_networks() 90 91 if not networks: 92 print("This device is not a member of any network.") 93 else: 94 print("Networks this device is a member of:") 95 for network in networks: 96 owner: str = " (owner)" * (network.owner_uuid == context.host.uuid) 97 print(f" - [{['public', 'private'][network.hidden]}] {network.name}{owner} (UUID: {network.uuid})") 98 99 100 @handle_network.subcommand("public") 101 def handle_network_public(context: DeviceContext, _: Any) -> None: 102 """ 103 View public networks 104 """ 105 106 networks: list[Network] = Network.get_public_networks(context.client) 107 108 if not networks: 109 print("There is no public network.") 110 else: 111 print("Public networks:") 112 for network in networks: 113 owner: str = " (owner)" * (network.owner_uuid == context.host.uuid) 114 print(f" - {network.name}{owner} (UUID: {network.uuid})") 115 116 117 @handle_network.subcommand("create") 118 def handle_network_create(context: DeviceContext, args: list[str]) -> None: 119 """ 120 Create a new network 121 """ 122 123 if len(args) != 2 or args[1] not in ("public", "private"): 124 raise CommandError("usage: network create <name> public|private") 125 126 try: 127 context.host.create_network(args[0], args[1] == "private") 128 except MaximumNetworksReachedError: 129 raise CommandError("You already own two networks.") 130 except InvalidNameError: 131 raise CommandError("Invalid name.") 132 except NameAlreadyInUseError: 133 raise CommandError("This name is already in use.") 134 135 136 @handle_network.subcommand("members") 137 def handle_network_members(context: DeviceContext, args: list[str]) -> None: 138 """ 139 View the members of one of your networks 140 """ 141 142 if len(args) != 1: 143 raise CommandError("usage: network members <network>") 144 145 network: Network = get_network(context, args[0]) 146 147 try: 148 members: list[NetworkMembership] = network.get_members() 149 except NetworkNotFoundError: 150 raise CommandError("Permission denied.") 151 152 if not members: 153 print("This network has no members.") 154 else: 155 print(f"Members of '{network.name}':") 156 for member in members: 157 device: Device = Device.get_device(context.client, member.device_uuid) 158 print(f" - [{['off', 'on'][device.powered_on]}] {device.name} (UUID: {device.uuid})") 159 160 161 @handle_network.subcommand("request") 162 def handle_network_request(context: DeviceContext, args: list[str]) -> None: 163 """ 164 Request membership of a network 165 """ 166 167 if len(args) != 1: 168 raise CommandError("usage: network request <network>") 169 170 network: Network = get_network(context, args[0]) 171 172 try: 173 network.request_membership(context.host) 174 except AlreadyMemberOfNetworkError: 175 raise CommandError("This device is already a member of the network.") 176 except InvitationAlreadyExistsError: 177 raise CommandError("You already requested to join this network.") 178 179 180 @handle_network.subcommand("requests") 181 def handle_network_requests(context: DeviceContext, args: list[str]) -> None: 182 """ 183 View open membership requests of one of your networks 184 """ 185 186 if len(args) != 1: 187 raise CommandError("usage: network requests <network>") 188 189 network: Network = get_network(context, args[0]) 190 191 try: 192 requests: list[NetworkInvitation] = network.get_membership_requests() 193 except NoPermissionsError: 194 raise CommandError("Permission denied.") 195 196 if not requests: 197 print("There are no pending requests for this network.") 198 else: 199 print("Pending requests:") 200 for request in requests: 201 device: Device = Device.get_device(context.client, request.device_uuid) 202 print(f" - {device.name} (UUID: {device.uuid})") 203 204 205 @handle_network.subcommand("accept") 206 def handle_network_accept(context: DeviceContext, args: list[str]) -> None: 207 """ 208 Accept a membership request or an invitation 209 """ 210 211 handle_membership_request(context, args, True) 212 213 214 @handle_network.subcommand("deny") 215 def handle_network_deny(context: DeviceContext, args: list[str]) -> None: 216 """ 217 Deny a membership request or an invitation 218 """ 219 220 handle_membership_request(context, args, False) 221 222 223 @handle_network.subcommand("invite") 224 def handle_network_invite(context: DeviceContext, args: list[str]) -> None: 225 """ 226 Invite a device to one of your networks 227 """ 228 229 if len(args) != 2: 230 raise CommandError("usage: network invite <network> <device>") 231 232 network: Network = get_network(context, args[0]) 233 234 device: Device = get_device(context, args[1]) 235 try: 236 network.invite_device(device) 237 except NetworkNotFoundError: 238 raise CommandError("Permission denied.") 239 except AlreadyMemberOfNetworkError: 240 raise CommandError("Device is already a member of this network.") 241 except InvitationAlreadyExistsError: 242 raise CommandError("An invitation for this device already exists.") 243 244 245 @handle_network.subcommand("invitations") 246 def handle_network_invitations(context: DeviceContext, _: Any) -> None: 247 """ 248 View invitations for this device to other networks 249 """ 250 251 invitations: list[NetworkInvitation] = context.host.get_network_invitations() 252 if not invitations: 253 print("There are no pending network invitations for this device.") 254 else: 255 print("Pending network invitations:") 256 for invitation in invitations: 257 network: Network = Network.get_by_uuid(context.client, invitation.network_uuid) 258 owner: str = " (owner)" * (network.owner_uuid == context.host.uuid) 259 print(f" - [{['public', 'private'][network.hidden]}] {network.name}{owner} (UUID: {network.uuid})") 260 261 262 @handle_network.subcommand("leave") 263 def handle_network_leave(context: DeviceContext, args: list[str]) -> None: 264 """ 265 Leave a network 266 """ 267 268 if len(args) != 1: 269 raise CommandError("usage: network leave <network>") 270 271 network: Network = get_network(context, args[0]) 272 273 try: 274 network.leave(context.host) 275 except CannotLeaveOwnNetworkError: 276 raise CommandError("You cannot leave your own network.") 277 278 279 @handle_network.subcommand("kick") 280 def handle_network_kick(context: DeviceContext, args: list[str]) -> None: 281 """ 282 Kick a device from one of your networks 283 """ 284 285 if len(args) != 2: 286 raise CommandError("usage: network kick <network> <device>") 287 288 network: Network = get_network(context, args[0]) 289 290 devices: list[Device] = [] 291 for member in network.get_members(): 292 devices.append(Device.get_device(context.client, member.device_uuid)) 293 294 device: Device = get_device(context, args[1], devices) 295 try: 296 network.kick(device) 297 except NetworkNotFoundError: 298 raise CommandError("Permission denied.") 299 except NoPermissionsError: 300 raise CommandError("Permission denied.") 301 except CannotKickOwnerError: 302 raise CommandError("You cannot kick the owner of the network.") 303 304 305 @handle_network.subcommand("delete") 306 def handle_network_delete(context: DeviceContext, args: list[str]) -> None: 307 """ 308 Delete one of your networks 309 """ 310 311 if len(args) != 1: 312 raise CommandError("usage: network delete <network>") 313 314 network: Network = get_network(context, args[0]) 315 316 try: 317 network.delete() 318 except NetworkNotFoundError: 319 raise CommandError("Permission denied.") 320 321 322 def device_network_names(context: DeviceContext) -> list[str]: 323 return [network.name for network in context.host.get_networks()] 324 325 326 def public_network_names(context: DeviceContext) -> list[str]: 327 return [network.name for network in Network.get_public_networks(context.client)] 328 329 330 def invitation_network_names(context: DeviceContext) -> list[str]: 331 return [ 332 Network.get_by_uuid(context.client, invitation.network_uuid).name 333 for invitation in context.host.get_network_invitations() 334 ] 335 336 337 @handle_network_members.completer() 338 @handle_network_request.completer() 339 @handle_network_requests.completer() 340 @handle_network_leave.completer() 341 @handle_network_delete.completer() 342 def network_completer(context: DeviceContext, args: list[str]) -> list[str]: 343 if len(args) == 1: 344 return [*{*device_network_names(context), *public_network_names(context), *invitation_network_names(context)}] 345 return [] 346 347 348 @handle_network_create.completer() 349 def network_create_completer(_: Any, args: list[str]) -> list[str]: 350 if len(args) == 2: 351 return ["public", "private"] 352 return [] 353 354 355 @handle_network_accept.completer() 356 @handle_network_deny.completer() 357 def network_accept_deny_completer(context: DeviceContext, args: list[str]) -> list[str]: 358 if len(args) == 1: 359 return [*{*device_network_names(context), *invitation_network_names(context)}] 360 if len(args) == 2: 361 try: 362 network: Network = get_network(context, args[0]) 363 except CommandError: 364 return [] 365 device_names: list[str] = [] 366 for request in network.get_membership_requests(): 367 try: 368 device_names.append(Device.get_device(context.client, request.device_uuid).name) 369 except DeviceNotFoundError: 370 pass 371 return [name for name in device_names if device_names.count(name) == 1] 372 return [] 373 374 375 @handle_network_invite.completer() 376 def network_invite_completer(context: DeviceContext, args: list[str]) -> list[str]: 377 if len(args) == 1: 378 return [*{*device_network_names(context)}] 379 if len(args) == 2: 380 device_names: list[str] = [device.name for device in Device.list_devices(context.client)] 381 return [name for name in device_names if device_names.count(name) == 1] 382 return [] 383 384 385 @handle_network_kick.completer() 386 def network_kick_completer(context: DeviceContext, args: list[str]) -> list[str]: 387 if len(args) == 1: 388 return [*{*device_network_names(context)}] 389 if len(args) == 2: 390 try: 391 network: Network = get_network(context, args[0]) 392 except CommandError: 393 return [] 394 device_names: list[str] = [] 395 for member in network.get_members(): 396 device_names.append(Device.get_device(context.client, member.device_uuid).name) 397 return [name for name in device_names if device_names.count(name) == 1] 398 return []