Source code for real_robot.utils.multiprocessing.utils

import multiprocessing as mp
import os
import time

from real_robot import LOGGER

from .shared_object import SharedObject


[docs] class SharedObjectDefaultDict(dict): """This defaultdict helps to store SharedObject by name (only known at runtime) so we don't need to frequently create SharedObject """ def __missing__(self, so_name: str) -> SharedObject: so = self[so_name] = SharedObject(so_name) return so
[docs] def start_and_wait_for_process(process: mp.Process, *, timeout: float = 30) -> None: """Start and wait for a child process to be ready (finishes initialization) When the waiting process is ready, it should trigger SharedObject "proc_<pid>_ready" :param process: mp.Process :param timeout: If process is not ready after timeout seconds, raise a TimeoutError """ # TODO: should this be written using mp.Pipe? process.start() so_ready = SharedObject(f"proc_{process.pid}_ready") start_time = time.perf_counter() while not so_ready.triggered: if time.perf_counter() - start_time > timeout: raise TimeoutError( f"Process {process.name} did not become ready within {timeout=} seconds" ) LOGGER.trace(f"Sleep to wait for signal from <{process.name}>") time.sleep(1.0)
[docs] def signal_process_ready() -> None: """ When called, signals that the current child process is ready by triggering SharedObject "proc_<pid>_ready" """ SharedObject(f"proc_{os.getpid()}_ready").trigger().unlink() LOGGER.trace(f"[Signal] Process {os.getpid()} is ready")