Source code for real_robot.utils.multiprocessing.utils
import multiprocessing as mp
import os
import time
from real_robot import LOGGER
from .shared_object import SharedObject
[docs]
class SharedObjectDefaultDict(dict):
"""This defaultdict helps to store SharedObject by name (only known at runtime)
so we don't need to frequently create SharedObject
"""
def __missing__(self, so_name: str) -> SharedObject:
so = self[so_name] = SharedObject(so_name)
return so
[docs]
def start_and_wait_for_process(process: mp.Process, *, timeout: float = 30) -> None:
"""Start and wait for a child process to be ready (finishes initialization)
When the waiting process is ready, it should trigger SharedObject "proc_<pid>_ready"
:param process: mp.Process
:param timeout: If process is not ready after timeout seconds, raise a TimeoutError
"""
# TODO: should this be written using mp.Pipe?
process.start()
so_ready = SharedObject(f"proc_{process.pid}_ready")
start_time = time.perf_counter()
while not so_ready.triggered:
if time.perf_counter() - start_time > timeout:
raise TimeoutError(
f"Process {process.name} did not become ready within {timeout=} seconds"
)
LOGGER.trace(f"Sleep to wait for signal from <{process.name}>")
time.sleep(1.0)
[docs]
def signal_process_ready() -> None:
"""
When called, signals that the current child process is ready
by triggering SharedObject "proc_<pid>_ready"
"""
SharedObject(f"proc_{os.getpid()}_ready").trigger().unlink()
LOGGER.trace(f"[Signal] Process {os.getpid()} is ready")