/ src / sirocco / core / namelistfile.py
namelistfile.py
 1  import re
 2  from dataclasses import dataclass, field
 3  from pathlib import Path
 4  from typing import Any, Self
 5  
 6  import f90nml
 7  
 8  from sirocco.parsing import yaml_data_models as models
 9  
10  
11  @dataclass(kw_only=True)
12  class NamelistFile(models.ConfigNamelistFileSpec):
13      """A wrapper Class around f90nml.namelist.Namelist
14  
15      - adds a name and a path
16      - adds the update_from_specs method"""
17  
18      name: str = field(init=False)
19  
20      def __post_init__(self) -> None:
21          self.path = self._validate_namelist_path(self.path)
22          self.name = self.path.name
23          self._namelist = f90nml.read(self.path)
24  
25      @classmethod
26      def from_config(cls: type[Self], config: models.ConfigNamelistFile, config_rootdir: Path) -> Self:
27          path = cls._validate_namelist_path(config.path, config_rootdir)
28          self = cls(path=path)
29          self.update_from_specs(config.specs)
30          return self
31  
32      @property
33      def namelist(self) -> f90nml.Namelist:
34          return self._namelist
35  
36      def update_from_specs(self, specs: dict[str, Any]) -> None:
37          """Updates the internal namelist from the specs."""
38          for section, params in specs.items():
39              section_name, k = self.section_index(section)
40              # Create section if non-existent
41              if section_name not in self.namelist:
42                  # NOTE: f90nml will automatially create the corresponding nested f90nml.Namelist
43                  #       objects, no need to explicitly use the f90nml.Namelist class constructor
44                  self.namelist[section_name] = {} if k is None else [{}]
45              # Update namelist with user input
46              # NOTE: unlike FORTRAN convention, user index starts at 0 as in Python
47              if k == len(self.namelist[section_name]) + 1:
48                  # Create additional section if required
49                  self.namelist[section_name][k] = f90nml.Namelist()
50              nml_section = self.namelist[section_name] if k is None else self.namelist[section_name][k]
51              nml_section.update(params)
52  
53      def dump(self, path: Path) -> None:
54          if path.is_file():
55              path.unlink()
56          if path.is_dir():
57              msg = f"Cannot write namelist {path.name} to path {path.name} already exists."
58              raise OSError(msg)
59          self.namelist.write(path)
60  
61      @staticmethod
62      def _validate_namelist_path(config_namelist_path: Path, config_rootdir: Path | None = None) -> Path:
63          if config_rootdir is None and not config_namelist_path.is_absolute():
64              msg = f"Cannot specify relative path {config_namelist_path} for namelist while the rootdir is None"
65              raise ValueError(msg)
66  
67          namelist_path = config_namelist_path if config_rootdir is None else (config_rootdir / config_namelist_path)
68          if not namelist_path.exists():
69              msg = f"Namelist in path {namelist_path} does not exist."
70              raise FileNotFoundError(msg)
71          if not namelist_path.is_file():
72              msg = f"Namelist in path {namelist_path} is not a file."
73              raise OSError(msg)
74          return namelist_path
75  
76      @staticmethod
77      def section_index(section_name) -> tuple[str, int | None]:
78          """Check for single vs multiple namelist section
79  
80          Check if the user specified a section name that ends with digits
81          between brackets, for example:
82  
83          section_index("section[123]") -> ("section", 123)
84          section_index("section123") -> ("section123", None)
85  
86          This is the convention chosen to indicate multiple
87          sections with the same name, typically `output_nml` for multiple
88          output streams."""
89          multi_section_pattern = re.compile(r"(.*)\[([0-9]+)\]$")
90          if m := multi_section_pattern.match(section_name):
91              return m.group(1), int(m.group(2)) - 1
92          return section_name, None