/ PyCrypCli / PyCrypCli / commands / files.py
files.py
  1  from typing import Any, cast
  2  
  3  from .command import command, CommandError
  4  from ..context import DeviceContext
  5  from ..exceptions import (
  6      FileAlreadyExistsError,
  7      InvalidWalletFileError,
  8      UnknownSourceOrDestinationError,
  9      PermissionDeniedError,
 10      FileNotChangeableError,
 11  )
 12  from ..models import File, Wallet
 13  
 14  
 15  @command("ls", [DeviceContext], aliases=["l", "dir"])
 16  def handle_ls(context: DeviceContext, args: list[str]) -> None:
 17      """
 18      List all files
 19      """
 20  
 21      directory = context.pwd if not args else context.path_to_file(args[0])
 22      if directory is None:
 23          raise CommandError("No such file or directory.")
 24  
 25      if directory.is_directory:
 26          files: list[File] = context.get_files(directory.uuid)
 27          files.sort(key=lambda f: [1 - f.is_directory, f.name])
 28      else:
 29          files = [directory]
 30  
 31      for file in files:
 32          print(["[FILE] ", "[DIR]  "][file.is_directory] + file.name)
 33  
 34  
 35  @command("pwd", [DeviceContext])
 36  def handle_pwd(context: DeviceContext, _: Any) -> None:
 37      """
 38      Print the current working directory
 39      """
 40  
 41      print(context.file_to_path(context.pwd))
 42  
 43  
 44  @command("mkdir", [DeviceContext])
 45  def handle_mkdir(context: DeviceContext, args: list[str]) -> None:
 46      """
 47      Create a new directory
 48      """
 49  
 50      if not args:
 51          raise CommandError("usage: mkdir <dirname>")
 52  
 53      *path, dirname = args[0].split("/")
 54      parent: File | None = context.path_to_file("/".join(path))
 55      if parent is None:
 56          raise CommandError("No such file or directory.")
 57      if not parent.is_directory:
 58          raise CommandError("That is no directory.")
 59  
 60      try:
 61          context.host.create_file(dirname, "", True, parent.uuid)
 62      except FileAlreadyExistsError:
 63          raise CommandError("There already exists a file with this name.")
 64  
 65  
 66  @command("cd", [DeviceContext])
 67  def handle_cd(context: DeviceContext, args: list[str]) -> None:
 68      """
 69      Change the current working directory
 70      """
 71  
 72      if not args:
 73          context.pwd = context.get_root_dir()
 74      else:
 75          directory: File | None = context.path_to_file(args[0])
 76          if directory is None:
 77              raise CommandError("The specified directory does not exist")
 78          if not directory.is_directory:
 79              raise CommandError("That is no directory.")
 80  
 81          context.pwd = directory
 82  
 83  
 84  @command("..", [DeviceContext])
 85  def handle_dot_dot(context: DeviceContext, _: Any) -> None:
 86      """
 87      Go to parent directory
 88      """
 89  
 90      handle_cd(context, [".."])
 91  
 92  
 93  def create_file(context: DeviceContext, filepath: str, content: str) -> None:
 94      *path, filename = filepath.split("/")
 95      parent: File | None = context.path_to_file("/".join(path))
 96      if not parent:
 97          raise CommandError("Parent directory does not exist.")
 98  
 99      if not filename:
100          raise CommandError("Filename cannot be empty.")
101      if len(filename) > 64:
102          raise CommandError("Filename cannot be longer than 64 characters.")
103  
104      file: File | None = context.get_file(filename, parent.uuid)
105      if file is not None:
106          if file.is_directory:
107              raise CommandError("A directory with this name already exists.")
108          file.edit(content)
109      else:
110          context.host.create_file(filename, content, False, parent.uuid)
111  
112  
113  @command("touch", [DeviceContext])
114  def handle_touch(context: DeviceContext, args: list[str]) -> None:
115      """
116      Create a new file with given content
117      """
118  
119      if not args:
120          raise CommandError("usage: touch <filepath> [content]")
121  
122      filepath, *content = args
123      create_file(context, filepath, " ".join(content))
124  
125  
126  @command("cat", [DeviceContext])
127  def handle_cat(context: DeviceContext, args: list[str]) -> None:
128      """
129      Print the content of a file
130      """
131  
132      if not args:
133          raise CommandError("usage: cat <filepath>")
134  
135      path: str = args[0]
136      file: File | None = context.path_to_file(path)
137      if file is None:
138          raise CommandError("File does not exist.")
139      if file.is_directory:
140          raise CommandError(f"'{path}' is a directory.")
141  
142      print(file.content)
143  
144  
145  @command("rm", [DeviceContext])
146  def handle_rm(context: DeviceContext, args: list[str]) -> None:
147      """
148      Remove a file
149      """
150  
151      if not args:
152          raise CommandError("usage: rm <filepath>")
153  
154      filepath: str = args[0]
155      file: File | None = context.path_to_file(filepath)
156      if file is None:
157          raise CommandError("File does not exist.")
158  
159      if file.is_directory:
160          pwd = context.pwd
161          while True:
162              if pwd.uuid == file.uuid:
163                  raise CommandError("Refusing to delete this directory.")
164              if pwd.uuid is None:
165                  break
166              pwd = context.get_parent_dir(pwd)
167  
168          question: str = f"Are you sure you want to delete the directory '{filepath}' including all contained files?"
169      else:
170          question = f"Are you sure you want to delete the file '{filepath}'?"
171      if context.ask(question + " [yes|no] ", ["yes", "no"]) == "no":
172          raise CommandError("File has not been deleted.")
173  
174      content: str = file.content
175      try:
176          wallet: Wallet = context.extract_wallet(content)
177          choice: str = context.ask(
178              f"\033[38;2;255;51;51mThis file contains {wallet.amount} morphcoin. "
179              f"Do you want to delete the corresponding wallet? [yes|no] \033[0m",
180              ["yes", "no"],
181          )
182          if choice == "yes":
183              wallet.delete()
184              print("The wallet has been deleted.")
185          else:
186              print("The following key might now be the only way to access your wallet.")
187              print(content)
188      except (InvalidWalletFileError, UnknownSourceOrDestinationError, PermissionDeniedError):
189          pass
190  
191      try:
192          file.delete()
193      except FileNotChangeableError:
194          raise CommandError("Some files could not be deleted.")
195  
196  
197  def check_file_movable(
198      context: DeviceContext, source: str, destination: str, move: bool
199  ) -> tuple[File, str, str | None] | None:
200      file: File | None = context.path_to_file(source)
201      if file is None:
202          raise CommandError("File does not exist.")
203  
204      dest_file: File | None = context.path_to_file(destination)
205      absolute = destination[0] == "/"
206      dest_parent_path, _, dest_name = destination[absolute:].rpartition("/")
207      dest_parent: File | None = context.path_to_file("/" * absolute + dest_parent_path)
208      dest_dir: str | None
209  
210      if file.is_directory:
211          if dest_file is None:
212              if dest_parent is None:
213                  raise CommandError("No such file or directory.")
214              if not dest_parent.is_directory:
215                  raise CommandError("Not a directory.")
216              dest_dir = dest_parent.uuid
217          elif dest_file.is_directory:
218              sub_file: File | None = context.get_file(dest_name, dest_file.uuid)
219              if sub_file is not None:
220                  if sub_file.is_directory:
221                      if context.get_files(sub_file.uuid):
222                          raise CommandError("Directory is not empty.")
223                      sub_file.delete()
224                  else:
225                      raise CommandError("Directory cannot replace a file.")
226              dest_name = file.name
227              dest_dir = dest_file.uuid
228          else:
229              raise CommandError("Directory cannot replace a file.")
230      else:
231          if dest_file is None:
232              if dest_parent is None:
233                  raise CommandError("No such file or directory.")
234              if not dest_parent.is_directory:
235                  raise CommandError("Not a directory.")
236              dest_dir = dest_parent.uuid
237          elif dest_file.is_directory:
238              sub_file = context.get_file(dest_name, dest_file.uuid)
239              if sub_file is not None:
240                  if sub_file.is_directory:
241                      raise CommandError("File cannot replace a directory.")
242                  sub_file.delete()
243              dest_name = file.name
244              dest_dir = dest_file.uuid
245          else:
246              dest_file.delete()
247              dest_dir = cast(File, dest_parent).uuid
248  
249      if dest_dir == file.parent_dir_uuid and dest_name == file.name:
250          return None
251  
252      if dest_dir is not None:
253          dir_to_check: File | None = File.get_file(context.client, file.device_uuid, dest_dir)
254          while True:
255              if not dir_to_check:
256                  break
257              if dir_to_check.uuid == file.uuid:
258                  raise CommandError(f"You cannot {['copy', 'move'][move]} a directory into itself.")
259              dir_to_check = context.get_parent_dir(dir_to_check)
260  
261      if not dest_name:
262          raise CommandError("Destination filename cannot be empty.")
263      if len(dest_name) > 64:
264          raise CommandError("Destination filename cannot be longer than 64 characters.")
265  
266      return file, dest_name, dest_dir
267  
268  
269  @command("cp", [DeviceContext])
270  def handle_cp(context: DeviceContext, args: list[str]) -> None:
271      """
272      Create a copy of a file
273      """
274  
275      if len(args) != 2:
276          raise CommandError("usage: cp <source> <destination>")
277  
278      result: tuple[File, str, str | None] | None = check_file_movable(context, args[0], args[1], move=False)
279      if result is None:
280          return
281  
282      file, dest_name, dest_dir = result
283      queue: list[tuple[File, str, str | None]] = [(file, dest_name, dest_dir)]
284      while queue:
285          file, dest_name, dest_dir = queue.pop(0)
286          new_file: File = context.host.create_file(dest_name, file.content, file.is_directory, dest_dir)
287          if file.is_directory:
288              for child in context.get_files(file.uuid):
289                  queue.append((child, child.name, cast(str, new_file.uuid)))
290  
291  
292  @command("mv", [DeviceContext])
293  def handle_mv(context: DeviceContext, args: list[str]) -> None:
294      """
295      Rename a file
296      """
297  
298      if len(args) != 2:
299          raise CommandError("usage: mv <source> <destination>")
300  
301      result: tuple[File, str, str | None] | None = check_file_movable(context, args[0], args[1], move=True)
302      if result is None:
303          return
304  
305      file, dest_name, dest_dir = result
306      file.move(dest_name, dest_dir)
307  
308  
309  @handle_ls.completer()
310  @handle_cat.completer()
311  @handle_touch.completer()
312  @handle_rm.completer()
313  def simple_file_completer(context: DeviceContext, args: list[str]) -> list[str]:
314      if len(args) == 1:
315          return context.file_path_completer(args[0])
316      return []
317  
318  
319  @handle_cd.completer()
320  @handle_mkdir.completer()
321  def simple_directory_completer(context: DeviceContext, args: list[str]) -> list[str]:
322      if len(args) == 1:
323          return context.file_path_completer(args[0], dirs_only=True)
324      return []
325  
326  
327  @handle_mv.completer()
328  @handle_cp.completer()
329  def copy_completer(context: DeviceContext, args: list[str]) -> list[str]:
330      if 1 <= len(args) <= 2:
331          return context.file_path_completer(args[-1])
332      return []