[docs]classSharedObjectDefaultDict(dict):"""This defaultdict helps to store SharedObject by name (only known at runtime) so we don't need to frequently create SharedObject """def__missing__(self,so_name:str)->SharedObject:so=self[so_name]=SharedObject(so_name)returnso
[docs]defstart_and_wait_for_process(process:mp.Process,*,timeout:float=None)->None:"""Start and wait for process to be ready (finishes initialization) When the waiting process is ready, it should trigger SharedObject "proc_<pid>_ready" :param process: mp.Process :param timeout: If process is not ready after timeout seconds, raise a TimeoutError If timeout is None, wait indefinitely """# TODO: should this be written using mp.Pipe?process.start()so_ready=SharedObject(f"proc_{process.pid}_ready")start_time=time.time()whilenotso_ready.triggered:iftimeoutisnotNoneandtime.time()-start_time>timeout:raiseTimeoutError(f"Process {process.name} did not become ready within {timeout=} seconds")
[docs]defsignal_process_ready()->None:"""When called, signals that the current process is ready by triggering SharedObject "proc_<pid>_ready" """SharedObject(f"proc_{os.getpid()}_ready").trigger().unlink()