[docs]classSharedObjectDefaultDict(dict):"""This defaultdict helps to store SharedObject by name (only known at runtime) so we don't need to frequently create SharedObject """def__missing__(self,so_name:str)->SharedObject:so=self[so_name]=SharedObject(so_name)returnso
[docs]defstart_and_wait_for_process(process:mp.Process,*,timeout:float=30)->None:"""Start and wait for a child process to be ready (finishes initialization) When the waiting process is ready, it should trigger SharedObject "proc_<pid>_ready" :param process: mp.Process :param timeout: If process is not ready after timeout seconds, raise a TimeoutError """# TODO: should this be written using mp.Pipe?process.start()so_ready=SharedObject(f"proc_{process.pid}_ready")start_time=time.perf_counter()whilenotso_ready.triggered:iftime.perf_counter()-start_time>timeout:raiseTimeoutError(f"Process {process.name} did not become ready within {timeout=} seconds")LOGGER.trace(f"Sleep to wait for signal from <{process.name}>")time.sleep(1.0)
[docs]defsignal_process_ready()->None:""" When called, signals that the current child process is ready by triggering SharedObject "proc_<pid>_ready" """SharedObject(f"proc_{os.getpid()}_ready").trigger().unlink()LOGGER.trace(f"[Signal] Process {os.getpid()} is ready")