/ music_engine.py
music_engine.py
  1  """brane music - Sheet music OCR (OMR) using homr."""
  2  
  3  import os
  4  import shutil
  5  import sys
  6  import tempfile
  7  from pathlib import Path
  8  
  9  import click
 10  
 11  _gpu_checked = None
 12  
 13  # Known locations for CUDA 12 and cuDNN 9 libraries.
 14  _CUDA_SEARCH_PATHS = [
 15      "/usr/local/cuda/lib64",
 16      "/usr/local/cuda-12.8/lib64",
 17      "/usr/local/lib/ollama/cuda_v12",       # Ollama's bundled CUDA 12
 18      "/usr/local/lib/ollama/mlx_cuda_v13",   # Ollama's cuDNN 9
 19  ]
 20  
 21  
 22  def _ensure_cuda_libs_loadable() -> None:
 23      """Add CUDA/cuDNN library dirs to LD_LIBRARY_PATH if not already present."""
 24      ld_path = os.environ.get("LD_LIBRARY_PATH", "")
 25      dirs_to_add = [d for d in _CUDA_SEARCH_PATHS if os.path.isdir(d) and d not in ld_path]
 26      if dirs_to_add:
 27          os.environ["LD_LIBRARY_PATH"] = ":".join(dirs_to_add + ([ld_path] if ld_path else []))
 28  
 29  
 30  def _cuda_actually_works() -> bool:
 31      """Test whether onnxruntime can actually create a CUDA session."""
 32      global _gpu_checked
 33      if _gpu_checked is not None:
 34          return _gpu_checked
 35      _gpu_checked = False
 36      try:
 37          _ensure_cuda_libs_loadable()
 38          import ctypes
 39          # Pre-load CUDA/cuDNN libs so onnxruntime's provider bridge can find them.
 40          for lib_name in ("libcudart.so.12", "libcublas.so.12", "libcublasLt.so.12",
 41                           "libcurand.so.10", "libcufft.so.11", "libcudnn.so.9"):
 42              for d in _CUDA_SEARCH_PATHS:
 43                  lib_path = os.path.join(d, lib_name)
 44                  if os.path.exists(lib_path):
 45                      ctypes.CDLL(lib_path, mode=ctypes.RTLD_GLOBAL)
 46                      break
 47          import onnxruntime as ort
 48          if "CUDAExecutionProvider" not in ort.get_available_providers():
 49              return False
 50          # Load the CUDA provider bridge — this is what actually fails
 51          # when CUDA toolkit libs (curand, cublas, cudnn, etc.) are missing.
 52          lib_dir = os.path.dirname(ort.__file__)
 53          cuda_lib = os.path.join(lib_dir, "capi", "libonnxruntime_providers_cuda.so")
 54          if os.path.exists(cuda_lib):
 55              ctypes.CDLL(cuda_lib)
 56              _gpu_checked = True
 57      except OSError:
 58          pass
 59      return _gpu_checked
 60  
 61  def write_musicxml(tree_or_path, output_path: Path) -> None:
 62      """Write a MusicXML file as .musicxml (plain XML) or .mxl (compressed)."""
 63      import xml.etree.ElementTree as ET
 64      import zipfile
 65  
 66      if isinstance(tree_or_path, (str, Path)):
 67          tree = ET.parse(tree_or_path)
 68      else:
 69          tree = tree_or_path
 70  
 71      output_path.parent.mkdir(parents=True, exist_ok=True)
 72  
 73      if output_path.suffix.lower() == ".mxl":
 74          # .mxl is a ZIP archive containing the MusicXML + container manifest
 75          xml_bytes = ET.tostring(tree.getroot(), encoding="UTF-8", xml_declaration=True)
 76          container = (
 77              '<?xml version="1.0" encoding="UTF-8"?>\n'
 78              '<container>\n'
 79              '  <rootfiles>\n'
 80              '    <rootfile full-path="score.musicxml"/>\n'
 81              '  </rootfiles>\n'
 82              '</container>\n'
 83          )
 84          with zipfile.ZipFile(str(output_path), "w", zipfile.ZIP_DEFLATED) as zf:
 85              zf.writestr("META-INF/container.xml", container)
 86              zf.writestr("score.musicxml", xml_bytes)
 87      else:
 88          ET.indent(tree)
 89          tree.write(str(output_path), encoding="UTF-8", xml_declaration=True)
 90  
 91  
 92  SUPPORTED_OUTPUT_FORMATS = {".musicxml", ".mxl"}
 93  SUPPORTED_IMAGE_FORMATS = {".png", ".jpg", ".jpeg"}
 94  SUPPORTED_PDF_FORMATS = {".pdf"}
 95  SUPPORTED_FORMATS = SUPPORTED_IMAGE_FORMATS | SUPPORTED_PDF_FORMATS
 96  
 97  
 98  def pdf_to_images(pdf_path: Path, dpi: int = 300) -> list[Path]:
 99      """Convert PDF pages to PNG images in a temp directory."""
100      import fitz
101  
102      doc = fitz.open(str(pdf_path))
103      temp_dir = Path(tempfile.mkdtemp(prefix="brane_music_"))
104      images = []
105      for i, page in enumerate(doc):
106          pix = page.get_pixmap(dpi=dpi)
107          img_path = temp_dir / f"{pdf_path.stem}_page{i + 1:03d}.png"
108          pix.save(str(img_path))
109          images.append(img_path)
110      doc.close()
111      return images
112  
113  
114  def clean_image(image_path: Path) -> Path:
115      """Binarize and clean a scanned sheet music image for better OMR."""
116      import cv2
117      import numpy as np
118  
119      img = cv2.imread(str(image_path))
120      gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
121      h, w = gray.shape
122  
123      # Adaptive threshold to remove uneven background (yellowed pages, shadows)
124      binary = cv2.adaptiveThreshold(
125          gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 51, 15
126      )
127  
128      # Small morphological opening to remove noise specks
129      kernel = np.ones((2, 2), np.uint8)
130      cleaned = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
131  
132      # Remove dark scan borders (binding shadow, scanner edge)
133      for col in range(w // 5):
134          if np.mean(cleaned[:, col]) > 230:
135              break
136          cleaned[:, col] = 255
137      for col in range(w - 1, w - w // 5, -1):
138          if np.mean(cleaned[:, col]) > 230:
139              break
140          cleaned[:, col] = 255
141      for row in range(h // 10):
142          if np.mean(cleaned[row, :]) > 230:
143              break
144          cleaned[row, :] = 255
145      for row in range(h - 1, h - h // 10, -1):
146          if np.mean(cleaned[row, :]) > 230:
147              break
148          cleaned[row, :] = 255
149  
150      out_path = image_path.with_stem(image_path.stem + "_clean")
151      cv2.imwrite(str(out_path), cleaned)
152      return out_path
153  
154  
155  def run_omr(image_path: Path, output_path: Path, use_gpu: bool = True,
156              clean: bool = False) -> Path:
157      """Run homr OMR on a single image, writing MusicXML to output_path."""
158      from homr.main import ProcessingConfig, download_weights, process_image
159      from homr.music_xml_generator import XmlGeneratorArguments
160  
161      use_gpu_final = use_gpu and _cuda_actually_works()
162      if use_gpu and not use_gpu_final:
163          click.echo("CUDA not fully available, falling back to CPU.", err=True)
164  
165      download_weights(use_gpu_final)
166  
167      config = ProcessingConfig(
168          enable_debug=False,
169          enable_cache=False,
170          write_staff_positions=False,
171          read_staff_positions=False,
172          selected_staff=-1,
173          use_gpu_inference=use_gpu_final,
174      )
175      xml_args = XmlGeneratorArguments()
176  
177      # homr writes .musicxml next to input, so copy to a temp dir
178      work_dir = Path(tempfile.mkdtemp(prefix="brane_omr_"))
179      try:
180          work_image = work_dir / image_path.name
181          shutil.copy2(image_path, work_image)
182  
183          if clean:
184              work_image = clean_image(work_image)
185  
186          process_image(str(work_image), config, xml_args)
187  
188          generated = work_image.with_suffix(".musicxml")
189          if not generated.exists():
190              raise RuntimeError(f"homr did not produce output for {image_path.name}")
191  
192          if output_path.suffix.lower() == ".mxl":
193              write_musicxml(generated, output_path)
194          else:
195              output_path.parent.mkdir(parents=True, exist_ok=True)
196              shutil.move(str(generated), str(output_path))
197      finally:
198          shutil.rmtree(work_dir, ignore_errors=True)
199  
200      return output_path
201  
202  
203  def concat_musicxml(files: list[Path], output_path: Path) -> None:
204      """Concatenate multiple MusicXML files into one by appending measures."""
205      import xml.etree.ElementTree as ET
206  
207      if not files:
208          return
209  
210      base_tree = ET.parse(files[0])
211      base_root = base_tree.getroot()
212  
213      # Find all <part> elements in the base file and index by id
214      base_parts = {p.get("id"): p for p in base_root.findall(".//part")}
215  
216      for extra_file in files[1:]:
217          extra_tree = ET.parse(extra_file)
218          extra_root = extra_tree.getroot()
219  
220          for extra_part in extra_root.findall(".//part"):
221              part_id = extra_part.get("id")
222              if part_id not in base_parts:
223                  continue
224              base_part = base_parts[part_id]
225  
226              # Renumber measures to continue from the base
227              existing = base_part.findall("measure")
228              next_num = max((int(m.get("number", 0)) for m in existing), default=0) + 1
229  
230              for measure in extra_part.findall("measure"):
231                  measure.set("number", str(next_num))
232                  next_num += 1
233                  base_part.append(measure)
234  
235      write_musicxml(base_tree, output_path)
236  
237  
238  @click.command("music")
239  @click.argument("inputs", nargs=-1, required=True, type=click.Path(exists=True, path_type=Path))
240  @click.option("-o", "--output", type=click.Path(path_type=Path),
241                help="Output file or directory. Defaults to input name with .musicxml extension.")
242  @click.option("--dpi", default=300, help="DPI for PDF rasterization (default: 300).")
243  @click.option("--no-gpu", is_flag=True, help="Disable GPU acceleration.")
244  @click.option("-c", "--concat", is_flag=True,
245                help="Concatenate all pages into a single MusicXML file.")
246  @click.option("--clean", is_flag=True,
247                help="Pre-process images (binarize, remove borders) for scanned/old scores.")
248  def music(inputs, output, dpi, no_gpu, concat, clean):
249      """Recognize sheet music and output MusicXML.
250  
251      Accepts images (PNG, JPG) and PDF files.
252  
253      Examples:
254  
255          brane music sheet.png
256  
257          brane music score.pdf -o score.musicxml
258  
259          brane music page1.png page2.png -o output_dir/
260  
261          brane music pg1.pdf pg2.pdf -c -o full_score.musicxml
262  
263          brane music old_scan.pdf --clean -o result.musicxml
264      """
265      if concat and not output:
266          click.echo("Error: --concat requires -o to specify the output file.", err=True)
267          sys.exit(1)
268  
269      validated = []
270      for inp in inputs:
271          if inp.suffix.lower() not in SUPPORTED_FORMATS:
272              click.echo(
273                  f"Error: Unsupported format: {inp.suffix} "
274                  f"(supported: {', '.join(sorted(SUPPORTED_FORMATS))})",
275                  err=True,
276              )
277              sys.exit(1)
278          validated.append(inp)
279  
280      # Expand PDFs to images, track temp dirs for cleanup
281      all_images = []  # (image_path, output_stem)
282      cleanup_dirs = set()
283      for inp in validated:
284          if inp.suffix.lower() in SUPPORTED_PDF_FORMATS:
285              click.echo(f"Converting PDF: {inp.name}", err=True)
286              page_images = pdf_to_images(inp, dpi=dpi)
287              cleanup_dirs.add(page_images[0].parent)
288              for img in page_images:
289                  all_images.append((img, img.stem))
290          else:
291              all_images.append((inp, inp.stem))
292  
293      # When concatenating, process into a temp dir then merge
294      if concat:
295          concat_dir = Path(tempfile.mkdtemp(prefix="brane_concat_"))
296          cleanup_dirs.add(concat_dir)
297          output_paths = [concat_dir / f"{stem}.musicxml" for _, stem in all_images]
298      elif output and output.suffix.lower() in SUPPORTED_OUTPUT_FORMATS:
299          if len(all_images) > 1:
300              click.echo(
301                  "Error: Cannot use a single output file with multiple inputs. "
302                  "Use -c/--concat to merge, or specify a directory.",
303                  err=True,
304              )
305              sys.exit(1)
306          output_paths = [output]
307      elif output:
308          output.mkdir(parents=True, exist_ok=True)
309          output_paths = [output / f"{stem}.musicxml" for _, stem in all_images]
310      else:
311          base_dir = validated[0].parent
312          output_paths = [base_dir / f"{stem}.musicxml" for _, stem in all_images]
313  
314      # Process each image
315      completed = []
316      try:
317          for (img_path, _stem), out_path in zip(all_images, output_paths):
318              click.echo(f"Processing: {img_path.name}", err=True)
319              try:
320                  run_omr(img_path, out_path, use_gpu=not no_gpu, clean=clean)
321                  completed.append(out_path)
322                  if not concat:
323                      click.echo(f"Written: {out_path}", err=True)
324              except Exception as e:
325                  click.echo(f"Error processing {img_path.name}: {e}", err=True)
326                  if len(all_images) == 1:
327                      sys.exit(1)
328  
329          if concat and completed:
330              click.echo(f"Concatenating {len(completed)} page(s)...", err=True)
331              concat_musicxml(completed, output)
332              click.echo(f"Written: {output}", err=True)
333      finally:
334          for d in cleanup_dirs:
335              shutil.rmtree(d, ignore_errors=True)
336  
337      click.echo(f"Done. Processed {len(all_images)} image(s).", err=True)