utils

class real_robot.utils.multiprocessing.utils.SharedObjectDefaultDict[source]

Bases: dict

This defaultdict helps to store SharedObject by name (only known at runtime) so we don’t need to frequently create SharedObject

real_robot.utils.multiprocessing.utils.signal_process_ready() None[source]

When called, signals that the current process is ready by triggering SharedObject “proc_<pid>_ready”

real_robot.utils.multiprocessing.utils.start_and_wait_for_process(process: Process, *, timeout: float | None = None) None[source]

Start and wait for process to be ready (finishes initialization) When the waiting process is ready, it should trigger SharedObject “proc_<pid>_ready”

Parameters:
  • process – mp.Process

  • timeout – If process is not ready after timeout seconds, raise a TimeoutError If timeout is None, wait indefinitely