/ music_engine.py
music_engine.py
1 """brane music - Sheet music OCR (OMR) using homr.""" 2 3 import os 4 import shutil 5 import sys 6 import tempfile 7 from pathlib import Path 8 9 import click 10 11 _gpu_checked = None 12 13 # Known locations for CUDA 12 and cuDNN 9 libraries. 14 _CUDA_SEARCH_PATHS = [ 15 "/usr/local/cuda/lib64", 16 "/usr/local/cuda-12.8/lib64", 17 "/usr/local/lib/ollama/cuda_v12", # Ollama's bundled CUDA 12 18 "/usr/local/lib/ollama/mlx_cuda_v13", # Ollama's cuDNN 9 19 ] 20 21 22 def _ensure_cuda_libs_loadable() -> None: 23 """Add CUDA/cuDNN library dirs to LD_LIBRARY_PATH if not already present.""" 24 ld_path = os.environ.get("LD_LIBRARY_PATH", "") 25 dirs_to_add = [d for d in _CUDA_SEARCH_PATHS if os.path.isdir(d) and d not in ld_path] 26 if dirs_to_add: 27 os.environ["LD_LIBRARY_PATH"] = ":".join(dirs_to_add + ([ld_path] if ld_path else [])) 28 29 30 def _cuda_actually_works() -> bool: 31 """Test whether onnxruntime can actually create a CUDA session.""" 32 global _gpu_checked 33 if _gpu_checked is not None: 34 return _gpu_checked 35 _gpu_checked = False 36 try: 37 _ensure_cuda_libs_loadable() 38 import ctypes 39 # Pre-load CUDA/cuDNN libs so onnxruntime's provider bridge can find them. 40 for lib_name in ("libcudart.so.12", "libcublas.so.12", "libcublasLt.so.12", 41 "libcurand.so.10", "libcufft.so.11", "libcudnn.so.9"): 42 for d in _CUDA_SEARCH_PATHS: 43 lib_path = os.path.join(d, lib_name) 44 if os.path.exists(lib_path): 45 ctypes.CDLL(lib_path, mode=ctypes.RTLD_GLOBAL) 46 break 47 import onnxruntime as ort 48 if "CUDAExecutionProvider" not in ort.get_available_providers(): 49 return False 50 # Load the CUDA provider bridge — this is what actually fails 51 # when CUDA toolkit libs (curand, cublas, cudnn, etc.) are missing. 52 lib_dir = os.path.dirname(ort.__file__) 53 cuda_lib = os.path.join(lib_dir, "capi", "libonnxruntime_providers_cuda.so") 54 if os.path.exists(cuda_lib): 55 ctypes.CDLL(cuda_lib) 56 _gpu_checked = True 57 except OSError: 58 pass 59 return _gpu_checked 60 61 def write_musicxml(tree_or_path, output_path: Path) -> None: 62 """Write a MusicXML file as .musicxml (plain XML) or .mxl (compressed).""" 63 import xml.etree.ElementTree as ET 64 import zipfile 65 66 if isinstance(tree_or_path, (str, Path)): 67 tree = ET.parse(tree_or_path) 68 else: 69 tree = tree_or_path 70 71 output_path.parent.mkdir(parents=True, exist_ok=True) 72 73 if output_path.suffix.lower() == ".mxl": 74 # .mxl is a ZIP archive containing the MusicXML + container manifest 75 xml_bytes = ET.tostring(tree.getroot(), encoding="UTF-8", xml_declaration=True) 76 container = ( 77 '<?xml version="1.0" encoding="UTF-8"?>\n' 78 '<container>\n' 79 ' <rootfiles>\n' 80 ' <rootfile full-path="score.musicxml"/>\n' 81 ' </rootfiles>\n' 82 '</container>\n' 83 ) 84 with zipfile.ZipFile(str(output_path), "w", zipfile.ZIP_DEFLATED) as zf: 85 zf.writestr("META-INF/container.xml", container) 86 zf.writestr("score.musicxml", xml_bytes) 87 else: 88 ET.indent(tree) 89 tree.write(str(output_path), encoding="UTF-8", xml_declaration=True) 90 91 92 SUPPORTED_OUTPUT_FORMATS = {".musicxml", ".mxl"} 93 SUPPORTED_IMAGE_FORMATS = {".png", ".jpg", ".jpeg"} 94 SUPPORTED_PDF_FORMATS = {".pdf"} 95 SUPPORTED_FORMATS = SUPPORTED_IMAGE_FORMATS | SUPPORTED_PDF_FORMATS 96 97 98 def pdf_to_images(pdf_path: Path, dpi: int = 300) -> list[Path]: 99 """Convert PDF pages to PNG images in a temp directory.""" 100 import fitz 101 102 doc = fitz.open(str(pdf_path)) 103 temp_dir = Path(tempfile.mkdtemp(prefix="brane_music_")) 104 images = [] 105 for i, page in enumerate(doc): 106 pix = page.get_pixmap(dpi=dpi) 107 img_path = temp_dir / f"{pdf_path.stem}_page{i + 1:03d}.png" 108 pix.save(str(img_path)) 109 images.append(img_path) 110 doc.close() 111 return images 112 113 114 def clean_image(image_path: Path) -> Path: 115 """Binarize and clean a scanned sheet music image for better OMR.""" 116 import cv2 117 import numpy as np 118 119 img = cv2.imread(str(image_path)) 120 gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) 121 h, w = gray.shape 122 123 # Adaptive threshold to remove uneven background (yellowed pages, shadows) 124 binary = cv2.adaptiveThreshold( 125 gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 51, 15 126 ) 127 128 # Small morphological opening to remove noise specks 129 kernel = np.ones((2, 2), np.uint8) 130 cleaned = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel) 131 132 # Remove dark scan borders (binding shadow, scanner edge) 133 for col in range(w // 5): 134 if np.mean(cleaned[:, col]) > 230: 135 break 136 cleaned[:, col] = 255 137 for col in range(w - 1, w - w // 5, -1): 138 if np.mean(cleaned[:, col]) > 230: 139 break 140 cleaned[:, col] = 255 141 for row in range(h // 10): 142 if np.mean(cleaned[row, :]) > 230: 143 break 144 cleaned[row, :] = 255 145 for row in range(h - 1, h - h // 10, -1): 146 if np.mean(cleaned[row, :]) > 230: 147 break 148 cleaned[row, :] = 255 149 150 out_path = image_path.with_stem(image_path.stem + "_clean") 151 cv2.imwrite(str(out_path), cleaned) 152 return out_path 153 154 155 def run_omr(image_path: Path, output_path: Path, use_gpu: bool = True, 156 clean: bool = False) -> Path: 157 """Run homr OMR on a single image, writing MusicXML to output_path.""" 158 from homr.main import ProcessingConfig, download_weights, process_image 159 from homr.music_xml_generator import XmlGeneratorArguments 160 161 use_gpu_final = use_gpu and _cuda_actually_works() 162 if use_gpu and not use_gpu_final: 163 click.echo("CUDA not fully available, falling back to CPU.", err=True) 164 165 download_weights(use_gpu_final) 166 167 config = ProcessingConfig( 168 enable_debug=False, 169 enable_cache=False, 170 write_staff_positions=False, 171 read_staff_positions=False, 172 selected_staff=-1, 173 use_gpu_inference=use_gpu_final, 174 ) 175 xml_args = XmlGeneratorArguments() 176 177 # homr writes .musicxml next to input, so copy to a temp dir 178 work_dir = Path(tempfile.mkdtemp(prefix="brane_omr_")) 179 try: 180 work_image = work_dir / image_path.name 181 shutil.copy2(image_path, work_image) 182 183 if clean: 184 work_image = clean_image(work_image) 185 186 process_image(str(work_image), config, xml_args) 187 188 generated = work_image.with_suffix(".musicxml") 189 if not generated.exists(): 190 raise RuntimeError(f"homr did not produce output for {image_path.name}") 191 192 if output_path.suffix.lower() == ".mxl": 193 write_musicxml(generated, output_path) 194 else: 195 output_path.parent.mkdir(parents=True, exist_ok=True) 196 shutil.move(str(generated), str(output_path)) 197 finally: 198 shutil.rmtree(work_dir, ignore_errors=True) 199 200 return output_path 201 202 203 def concat_musicxml(files: list[Path], output_path: Path) -> None: 204 """Concatenate multiple MusicXML files into one by appending measures.""" 205 import xml.etree.ElementTree as ET 206 207 if not files: 208 return 209 210 base_tree = ET.parse(files[0]) 211 base_root = base_tree.getroot() 212 213 # Find all <part> elements in the base file and index by id 214 base_parts = {p.get("id"): p for p in base_root.findall(".//part")} 215 216 for extra_file in files[1:]: 217 extra_tree = ET.parse(extra_file) 218 extra_root = extra_tree.getroot() 219 220 for extra_part in extra_root.findall(".//part"): 221 part_id = extra_part.get("id") 222 if part_id not in base_parts: 223 continue 224 base_part = base_parts[part_id] 225 226 # Renumber measures to continue from the base 227 existing = base_part.findall("measure") 228 next_num = max((int(m.get("number", 0)) for m in existing), default=0) + 1 229 230 for measure in extra_part.findall("measure"): 231 measure.set("number", str(next_num)) 232 next_num += 1 233 base_part.append(measure) 234 235 write_musicxml(base_tree, output_path) 236 237 238 @click.command("music") 239 @click.argument("inputs", nargs=-1, required=True, type=click.Path(exists=True, path_type=Path)) 240 @click.option("-o", "--output", type=click.Path(path_type=Path), 241 help="Output file or directory. Defaults to input name with .musicxml extension.") 242 @click.option("--dpi", default=300, help="DPI for PDF rasterization (default: 300).") 243 @click.option("--no-gpu", is_flag=True, help="Disable GPU acceleration.") 244 @click.option("-c", "--concat", is_flag=True, 245 help="Concatenate all pages into a single MusicXML file.") 246 @click.option("--clean", is_flag=True, 247 help="Pre-process images (binarize, remove borders) for scanned/old scores.") 248 def music(inputs, output, dpi, no_gpu, concat, clean): 249 """Recognize sheet music and output MusicXML. 250 251 Accepts images (PNG, JPG) and PDF files. 252 253 Examples: 254 255 brane music sheet.png 256 257 brane music score.pdf -o score.musicxml 258 259 brane music page1.png page2.png -o output_dir/ 260 261 brane music pg1.pdf pg2.pdf -c -o full_score.musicxml 262 263 brane music old_scan.pdf --clean -o result.musicxml 264 """ 265 if concat and not output: 266 click.echo("Error: --concat requires -o to specify the output file.", err=True) 267 sys.exit(1) 268 269 validated = [] 270 for inp in inputs: 271 if inp.suffix.lower() not in SUPPORTED_FORMATS: 272 click.echo( 273 f"Error: Unsupported format: {inp.suffix} " 274 f"(supported: {', '.join(sorted(SUPPORTED_FORMATS))})", 275 err=True, 276 ) 277 sys.exit(1) 278 validated.append(inp) 279 280 # Expand PDFs to images, track temp dirs for cleanup 281 all_images = [] # (image_path, output_stem) 282 cleanup_dirs = set() 283 for inp in validated: 284 if inp.suffix.lower() in SUPPORTED_PDF_FORMATS: 285 click.echo(f"Converting PDF: {inp.name}", err=True) 286 page_images = pdf_to_images(inp, dpi=dpi) 287 cleanup_dirs.add(page_images[0].parent) 288 for img in page_images: 289 all_images.append((img, img.stem)) 290 else: 291 all_images.append((inp, inp.stem)) 292 293 # When concatenating, process into a temp dir then merge 294 if concat: 295 concat_dir = Path(tempfile.mkdtemp(prefix="brane_concat_")) 296 cleanup_dirs.add(concat_dir) 297 output_paths = [concat_dir / f"{stem}.musicxml" for _, stem in all_images] 298 elif output and output.suffix.lower() in SUPPORTED_OUTPUT_FORMATS: 299 if len(all_images) > 1: 300 click.echo( 301 "Error: Cannot use a single output file with multiple inputs. " 302 "Use -c/--concat to merge, or specify a directory.", 303 err=True, 304 ) 305 sys.exit(1) 306 output_paths = [output] 307 elif output: 308 output.mkdir(parents=True, exist_ok=True) 309 output_paths = [output / f"{stem}.musicxml" for _, stem in all_images] 310 else: 311 base_dir = validated[0].parent 312 output_paths = [base_dir / f"{stem}.musicxml" for _, stem in all_images] 313 314 # Process each image 315 completed = [] 316 try: 317 for (img_path, _stem), out_path in zip(all_images, output_paths): 318 click.echo(f"Processing: {img_path.name}", err=True) 319 try: 320 run_omr(img_path, out_path, use_gpu=not no_gpu, clean=clean) 321 completed.append(out_path) 322 if not concat: 323 click.echo(f"Written: {out_path}", err=True) 324 except Exception as e: 325 click.echo(f"Error processing {img_path.name}: {e}", err=True) 326 if len(all_images) == 1: 327 sys.exit(1) 328 329 if concat and completed: 330 click.echo(f"Concatenating {len(completed)} page(s)...", err=True) 331 concat_musicxml(completed, output) 332 click.echo(f"Written: {output}", err=True) 333 finally: 334 for d in cleanup_dirs: 335 shutil.rmtree(d, ignore_errors=True) 336 337 click.echo(f"Done. Processed {len(all_images)} image(s).", err=True)