multiprocessing.py
1 from torch.multiprocessing import get_context, set_start_method 2 3 try: 4 set_start_method("spawn") 5 except RuntimeError: 6 pass 7 8 _manager = None 9 10 11 def get_manager(): 12 global _manager 13 if _manager is None: 14 try: 15 _manager = get_context("spawn").Manager() 16 except Exception: 17 # Fallback for environments where multiprocessing Manager fails 18 # (e.g., after many test iterations in CI) 19 import multiprocessing 20 _manager = multiprocessing.Manager() 21 return _manager