Source code for real_robot.utils.multiprocessing.shared_object

"""
Shared object implemented with SharedMemory and synchronization
Careful optimization is done to make it run as fast as possible
version 0.0.6

Written by Kolin Guo

Implementation Notes:
  * Methods `fetch()` and `assign()` are chosen to be distinctive from common python
    class methods (e.g., get(), set(), update(), read(), write(), fill(), put(), etc.)
  * Readers-Writer synchronization is achieved by `fcntl.flock()` (filesystem advisory
    lock). It's chosen over `multiprocessing.Lock` / `multiprocessing.Condition` so that
    no lock needs to be explicitly passed to child processes.
    However, note that acquiring `flock` locks are not guaranteed to be in order.
  * An object modified timestamp is maintained using `time.time_ns()` which is
    system-wide and has highest resolution. (https://peps.python.org/pep-0564/#linux)

Usage Notes:
  * For processes that are always waiting for a massive SharedObject (e.g., np.ndarray),
    it's best to use so.modified to check whether the data has been updated yet to avoid
    starving processes that are assigning to it (only fetch when so.modified is True).
    Alternatively, a separate boolean update flag can be used to achieve the same.
  * Examples:
    # Creates SharedObject with data
    >>> so = SharedObject("test", data=np.ones((480, 848, 3)))
    # Mounts existing SharedObject
    >>> so = SharedObject("test")
    # Creates a trigger SharedObject (data is None, can be used for joining processes)
    >>> so = SharedObject("trigger")
    >>> so.trigger()  # trigger the SharedObject
    >>> so.triggered  # check if triggered
  * Best practices when fetching np.ndarray (see `SharedObject._fetch_ndarray()`):
    >>> so.fetch(lambda x: x.sum())  # Apply operation only
    >>> so.fetch(lambda x: x + 1)  # Apply operation only
    >>> so.fetch()  # If different operations need to be applied on the same data
    >>> so.fetch()[..., 0]  # Slice only
    >>> so.fetch(lambda x: x[..., 0].copy()) + 1  # Slice and apply operation
"""
# ruff: noqa: UP007

from __future__ import annotations

import struct
import time
from collections.abc import Callable
from multiprocessing.shared_memory import SharedMemory
from typing import Any, Union

import numpy as np

from real_robot import LOGGER

try:
    from sapien import Pose  # type: ignore
except ModuleNotFoundError:
    LOGGER.warning("No sapien installed. Will not support sapien.Pose")

[docs] class Pose: def __init__(self) -> None: pass def __setstate__(self, state) -> None: pass def __getstate__(self) -> tuple: return ()
try: import fcntl except ModuleNotFoundError as e: LOGGER.critical("Not supported on Windows: failed to import fcntl") raise e _encoding = "utf8"
[docs] class ReadersLock: """lock.acquire() / .release() is slightly faster than using as a contextmanager""" def __init__(self, fd): self.fd = fd
[docs] def acquire(self): fcntl.flock(self.fd, fcntl.LOCK_SH)
[docs] def release(self): fcntl.flock(self.fd, fcntl.LOCK_UN)
def __enter__(self): self.acquire() return self def __exit__(self, exc_type, exc_value, traceback): self.release()
[docs] class WriterLock: """lock.acquire() / .release() is slightly faster than using as a contextmanager""" def __init__(self, fd): self.fd = fd
[docs] def acquire(self): fcntl.flock(self.fd, fcntl.LOCK_EX)
[docs] def release(self): fcntl.flock(self.fd, fcntl.LOCK_UN)
def __enter__(self): self.acquire() return self def __exit__(self, exc_type, exc_value, traceback): self.release()
[docs] class SharedObject: """ Shared object implemented with SharedMemory and synchronization. SharedMemory reallocation, casting object_type, changing numpy metas are not allowed Use SharedDynamicObject instead The shared memory buffer is organized as follows: - 8 bytes: object modified timestamp in ns (since the epoch), stored as 'Q' - 1 byte: object data type index, stored as 'B' - X bytes: data area. For `NoneType`, data area is ignored. For `bool`, 1 byte data. For `int` / `float`, 8 bytes data. For `sapien.Pose`, 7*4 = 28 bytes data ([xyz, wxyz], float32). For `str` / `bytes`, (N + 1) bytes data, N is str / bytes length, 1 is for termination. For `np.ndarray`, - 1 byte: array dtype index, stored as 'B' - 8 bytes: array ndim, stored as 'Q' - (K * 8) bytes: array shape for each dimension, stored as 'Q' - D bytes: array data buffer """ # object_type_idx: 0 1 2 3 4 5 6 7 _object_types = (None.__class__, bool, int, float, Pose, str, bytes, np.ndarray) @staticmethod def _get_bytes_size(enc_str: bytes, init_size: int) -> int: if (sz := len(enc_str) << 1) >= init_size: return sz + 10 else: return init_size + 10 _object_sizes = ( 9, # NoneType 10, # bool 17, # int 17, # float 37, # sapien.Pose _get_bytes_size.__func__, # str # type: ignore _get_bytes_size.__func__, # bytes # type: ignore lambda array, ndim: array.nbytes + ndim * 8 + 18, # ndarray ) @staticmethod def _fetch_metas(shm: SharedMemory) -> tuple: nbytes = shm._size # type: ignore mtime, object_type_idx = struct.unpack_from("QB", shm.buf, offset=0) np_metas = () if object_type_idx == 7: # np.ndarray np_metas = SharedObject._fetch_np_metas(shm.buf) return nbytes, mtime, object_type_idx, np_metas @staticmethod def _fetch_np_metas(buf) -> tuple: np_dtype_idx, data_ndim = struct.unpack_from("=BQ", buf, offset=9) data_shape = struct.unpack_from("Q" * data_ndim, buf, offset=18) return np_dtype_idx, data_ndim, data_shape _fetch_fn_type = "Callable[[Union[_object_types]], Any] | None" @staticmethod def _fetch_None(buf, fn: Callable[[None.__class__], Any] | None, *args) -> Any: return None if fn is None else fn(None) @staticmethod def _fetch_bool(buf, fn: Callable[[bool], Any] | None, *args) -> Any: return bool(buf[9]) if fn is None else fn(bool(buf[9])) @staticmethod def _fetch_int(buf, fn: Callable[[int], Any] | None, *args) -> Any: v = struct.unpack_from("q", buf, offset=9)[0] return v if fn is None else fn(v) @staticmethod def _fetch_float(buf, fn: Callable[[float], Any] | None, *args) -> Any: v = struct.unpack_from("d", buf, offset=9)[0] return v if fn is None else fn(v) @staticmethod def _fetch_pose(buf, fn: Callable[[Pose], Any] | None, *args) -> Any: """Fetch and construct a sapien.Pose (using __setstate__)""" pose = Pose.__new__(Pose) pose.__setstate__(struct.unpack_from("7f", buf, offset=9)) return pose if fn is None else fn(pose) @staticmethod def _fetch_str(buf, fn: Callable[[str], Any] | None, *args) -> Any: v = buf[9:].tobytes().rstrip(b"\x00")[:-1].decode(_encoding) return v if fn is None else fn(v) @staticmethod def _fetch_bytes(buf, fn: Callable[[bytes], Any] | None, *args) -> Any: v = buf[9:].tobytes().rstrip(b"\x00")[:-1] return v if fn is None else fn(v) @staticmethod def _fetch_ndarray( buf, fn: Callable[[np.ndarray], Any] | None, data_buf_ro: np.ndarray ) -> Any: """Always return a copy of the underlying buffer Examples (ordered from fastest to slowest, benchmarked with 480x848x3 np.uint8): # Apply operation only so.fetch(lambda x: x.sum()) # contiguous sum (triggers a copy) so.fetch().sum() # contiguous copy => sum # Apply operation only so.fetch(lambda x: x + 1) # contiguous add (triggers a copy) so.fetch() + 1 # contiguous copy => add # Slice only (results might vary depending on array size) so.fetch()[..., 0] # contiguous copy => slice so.fetch(lambda x: x[..., 0].copy()) # non-contiguous copy # Slice and apply operation (results might vary depending on array size) so.fetch(lambda x: x[..., 0].copy()) + 1 # non-contiguous copy => add so.fetch(lambda x: x[..., 0] + 1) # non-contiguous add (triggers a copy) so.fetch()[..., 0] + 1 # contiguous copy => non-contiguous add """ if fn is not None: data = fn(data_buf_ro) if not isinstance(data, np.ndarray) or data.flags.owndata: return data else: LOGGER.warning( "Fetching ndarray with fn that does not trigger a copy " "induces an extra copy. Consider changing to improve performance." ) return data.copy() else: return data_buf_ro.copy() _fetch_objects = ( _fetch_None.__func__, # type: ignore _fetch_bool.__func__, # type: ignore _fetch_int.__func__, # type: ignore _fetch_float.__func__, # type: ignore _fetch_pose.__func__, # type: ignore _fetch_str.__func__, # type: ignore _fetch_bytes.__func__, # type: ignore _fetch_ndarray.__func__, # type: ignore ) @staticmethod def _assign_np_metas(buf, np_dtype_idx: int, data_ndim: int, data_shape: tuple): struct.pack_into( "=BQ" + "Q" * data_ndim, buf, 9, np_dtype_idx, data_ndim, *data_shape ) @staticmethod def _assign_None(*args): pass @staticmethod def _assign_bool(buf, data: bool, *args): buf[9] = data @staticmethod def _assign_int(buf, data: int, *args): struct.pack_into("q", buf, 9, data) @staticmethod def _assign_float(buf, data: float, *args): struct.pack_into("d", buf, 9, data) @staticmethod def _assign_pose(buf, pose: Pose, *args): struct.pack_into("7f", buf, 9, *pose.__getstate__()) @staticmethod def _assign_bytes(buf, enc_data: bytes, buf_nbytes: int, *args): struct.pack_into(f"{buf_nbytes - 9}s", buf, 9, enc_data + b"\xff") @staticmethod def _assign_ndarray(buf, data: np.ndarray, buf_nbytes: int, data_buf: np.ndarray): data_buf[:] = data _assign_objects = ( _assign_None.__func__, # type: ignore _assign_bool.__func__, # type: ignore _assign_int.__func__, # type: ignore _assign_float.__func__, # type: ignore _assign_pose.__func__, # type: ignore _assign_bytes.__func__, # type: ignore _assign_bytes.__func__, # type: ignore _assign_ndarray.__func__, # type: ignore ) _np_dtypes = ( np.bool_, np.int8, np.uint8, np.int16, np.uint16, np.int32, np.uint32, np.int64, np.uint64, np.float16, np.float32, np.float64, np.float128, np.complex64, np.complex128, np.complex256, ) def __init__(self, name: str, *, data: Union[_object_types] = None, init_size=100): # type: ignore """ Examples: # Mounts SharedMemory "test" if exists, # Else creates SharedMemory "test" which holds None by default so = SharedObject("test") # Mounts SharedMemory "test" if exists and assign data (True) to it, # Else creates SharedMemory "test" and assigns data so = SharedObject("test", data=True) # Mounts SharedMemory "test" if exists and assign data (np.ones(10)) to it, # Else creates SharedMemory "test" and assigns data so = SharedObject("test", data=np.ones(10)) :param init_size: only used for str and bytes, initial buffer size to save frequent reallocation. The buffer is expanded with exponential growth rate of 2 """ self.init_size = init_size data, object_type_idx, nbytes, np_metas = self._preprocess_data(data) try: self.shm = SharedMemory(name) created = False except FileNotFoundError: # no SharedMemory with given name self.shm = SharedMemory(name, create=True, size=nbytes) created = True self.name = name self._readers_lock = ReadersLock(self.shm._fd) # type: ignore self._writer_lock = WriterLock(self.shm._fd) # type: ignore if created: self.nbytes = nbytes self.mtime = time.time_ns() self.object_type_idx = object_type_idx self.np_metas = np_metas # Assign object_type, np_metas to init object meta info self._writer_lock.acquire() self.shm.buf[8] = object_type_idx if object_type_idx == 7: # np.ndarray self._assign_np_metas(self.shm.buf, *np_metas) self._writer_lock.release() else: self._readers_lock.acquire() self.nbytes, self.mtime, self.object_type_idx, self.np_metas = ( self._fetch_metas(self.shm) ) self._readers_lock.release() # Create np.ndarray here to save frequent np.ndarray construction self.np_ndarray, self.np_ndarray_ro = None, None if self.object_type_idx == 7: # np.ndarray np_dtype_idx, data_ndim, data_shape = self.np_metas self.np_ndarray = np.ndarray( data_shape, dtype=self._np_dtypes[np_dtype_idx], buffer=self.shm.buf, offset=data_ndim * 8 + 18, ) # Create a read-only view for fetch() self.np_ndarray_ro = self.np_ndarray.view() self.np_ndarray_ro.setflags(write=False) # fill data if data is not None: if not created: LOGGER.warning("Implicitly overwriting data of {!r}", self) self._assign(data, object_type_idx, nbytes, np_metas) def _preprocess_data(self, data: Union[_object_types]) -> tuple: # type: ignore """Preprocess object data and return useful informations :return data: processed data. Only changed for str (=> encoded bytes) :return object_type_idx: object type index :return nbytes: number of bytes needed for SharedMemory :return np_metas: numpy meta info, (np_dtype_idx, data_ndim, data_shape) """ try: object_type_idx = self._object_types.index(type(data)) except ValueError as e: raise TypeError(f"Not supported object_type: {type(data)}") from e # Get shared memory size in bytes np_metas = () if object_type_idx <= 4: # NoneType, bool, int, float, sapien.Pose nbytes = self._object_sizes[object_type_idx] elif object_type_idx == 5: # str data = data.encode(_encoding) # encode strings into bytes nbytes = self._object_sizes[object_type_idx](data, self.init_size) elif object_type_idx == 6: # bytes nbytes = self._object_sizes[object_type_idx](data, self.init_size) elif object_type_idx == 7: # np.ndarray try: np_dtype_idx = self._np_dtypes.index(data.dtype) except ValueError as e: raise TypeError(f"Not supported numpy dtype: {data.dtype}") from e data_ndim = data.ndim np_metas = (np_dtype_idx, data_ndim, data.shape) nbytes = self._object_sizes[object_type_idx](data, data_ndim) else: raise ValueError(f"Unknown {object_type_idx = }") return data, object_type_idx, nbytes, np_metas @property def modified(self) -> bool: """Returns whether the object's data has been modified by another process. Check by fetching object modified timestamp and comparing with self.mtime """ self._readers_lock.acquire() mtime = struct.unpack_from("Q", self.shm.buf, offset=0)[0] self._readers_lock.release() return mtime > self.mtime @property def triggered(self) -> bool: """Returns whether the object is triggered (protected by readers lock) Check by fetching object modified timestamp, comparing with self.mtime and updating self.mtime """ self._readers_lock.acquire() mtime = struct.unpack_from("Q", self.shm.buf, offset=0)[0] self._readers_lock.release() modified = mtime > self.mtime self.mtime = mtime return modified
[docs] def fetch(self, fn: _fetch_fn_type = None) -> Any: # type: ignore """ Fetch a copy of data from SharedMemory (protected by readers lock) See SharedObject._fetch_ndarray() for best practices of fn with np.ndarray :param fn: function to apply on data, e.g., lambda x: x + 1. If fn is None or does not trigger a copy for ndarray (e.g., slicing, masking), a manual copy is applied. Thus, the best practices are ordered as: fn (triggers a copy) > fn = None >> fn (does not trigger a copy) because copying non-contiguous ndarray takes much longer time. :return data: a copy of data read from SharedMemory """ self._readers_lock.acquire() # Update modified timestamp self.mtime = struct.unpack_from("Q", self.shm.buf, offset=0)[0] data = self._fetch_objects[self.object_type_idx]( self.shm.buf, fn, self.np_ndarray_ro ) self._readers_lock.release() return data
[docs] def trigger(self) -> SharedObject: """Trigger by modifying object mtime (protected by writer lock)""" self._writer_lock.acquire() # Update mtime struct.pack_into("Q", self.shm.buf, 0, time.time_ns()) self._writer_lock.release() return self
[docs] def assign(self, data: Union[_object_types]) -> SharedObject: # type: ignore """Assign data to SharedMemory (protected by writer lock)""" return self._assign(*self._preprocess_data(data))
def _assign( self, data, object_type_idx: int, nbytes: int, np_metas: tuple ) -> SharedObject: """Inner function for assigning data (protected by writer lock) For SharedObject, object_type_idx, nbytes, and np_metas cannot be modified """ if ( object_type_idx != self.object_type_idx or nbytes > self.nbytes or np_metas != self.np_metas ): raise BufferError( f"Casting object type (new={self._object_types[object_type_idx]}, " f"old={self._object_types[self.object_type_idx]}) OR " f"Buffer overflow (new={nbytes} > {self.nbytes}=old) OR " f"Changed numpy meta (new={np_metas}, old={self.np_metas}) in {self!r}" ) self._writer_lock.acquire() # Assign mtime self.mtime = time.time_ns() struct.pack_into("Q", self.shm.buf, 0, self.mtime) # Assign object data self._assign_objects[self.object_type_idx]( self.shm.buf, data, self.nbytes, self.np_ndarray ) self._writer_lock.release() return self
[docs] def close(self): """Closes access to the shared memory from this instance but does not destroy the shared memory block.""" self.shm.close()
def __del__(self): self.close() def __reduce__(self): return self.__class__, (self.name,) def __repr__(self): return ( f"<{self.__class__.__name__}: name={self.name}, " f"data_type={self._object_types[self.object_type_idx]}, " f"nbytes={self.nbytes}>" )
[docs] class SharedDynamicObject(SharedObject): """ Shared object implemented with SharedMemory and synchronization Allow reallocating SharedMemory. Need more checks and thus is slower than SharedObject. In fact, this should never be implemented. For size-variable np.ndarray, just implement similar support as str/bytes """ @staticmethod def _fetch_metas(shm: SharedMemory) -> tuple: nbytes = ( shm._mmap.size() # type: ignore ) # _mmap size will be updated by os.ftruncate() mtime, object_type_idx = struct.unpack_from("QB", shm.buf, offset=0) np_metas = () if object_type_idx == 7: # np.ndarray np_metas = SharedObject._fetch_np_metas(shm.buf) return nbytes, mtime, object_type_idx, np_metas def __init__(self, *args, **kwargs): raise NotImplementedError("Implementation not complete")
[docs] def fetch(self, fn: SharedObject._fetch_fn_type = None) -> Any: # type: ignore """Fetch a copy of data from SharedMemory (protected by readers lock) :param fn: function to apply on data, e.g., lambda x: x + 1. :return data: a copy of data read from SharedMemory """ self._readers_lock.acquire() # Fetch shm info self.nbytes, self.mtime, self.object_type_idx, self.np_metas = ( self._fetch_metas(self.shm) ) data = self._fetch_objects[self.object_type_idx]( self.shm.buf, fn, self.np_ndarray_ro ) self._readers_lock.release() return data
[docs] def assign(self, data: Union[SharedObject._object_types], reallocate=False) -> None: # type: ignore """Assign data to SharedMemory (protected by writer lock) :param reallocate: whether to force reallocation """ # Check object type data, object_type_idx, nbytes, np_metas = self._preprocess_data(data) # noqa: F841 self._writer_lock.acquire() # Fetch shm info self.nbytes, self.mtime, self.object_type_idx, self.np_metas = ( self._fetch_metas(self.shm) ) # Reallocate if necessary if reallocate or nbytes > self.nbytes or np_metas != self.np_metas: # NOTE: Cannot use unlink() to reallocate SharedMemory # Otherwise, existing SharedObject instances to the same SharedMemory # will not be updated # Need to use os.ftruncate(sm._fd, new_size) raise NotImplementedError("reallocate is not yet implemented") self._assign_objects[self.object_type_idx]( self.shm.buf, data, self.nbytes, self.np_ndarray ) self._writer_lock.release()