Source code for real_robot.utils.multiprocessing.shared_object

"""
Shared object implemented with SharedMemory and synchronization
Careful optimization is done to make it run as fast as possible
version 0.0.6

Written by Kolin Guo

Implementation Notes:
  * Methods `fetch()` and `assign()` are chosen to be distinctive from common python
    class methods (e.g., get(), set(), update(), read(), write(), fill(), put(), etc.)
  * Readers-Writer synchronization is achieved by `fcntl.flock()` (filesystem advisory
    lock). It's chosen over `multiprocessing.Lock` / `multiprocessing.Condition` so that
    no lock needs to be explicitly passed to child processes.
    However, note that acquiring `flock` locks are not guaranteed to be in order.
  * An object modified timestamp is maintained using `time.time_ns()` which is
    system-wide and has highest resolution. (https://peps.python.org/pep-0564/#linux)

Usage Notes:
  * For processes that are always waiting for a massive SharedObject (e.g., np.ndarray),
    it's best to use so.modified to check whether the data has been updated yet to avoid
    starving processes that are assigning to it (only fetch when so.modified is True).
    Alternatively, a separate boolean update flag can be used to achieve the same.
  * Examples:
    # Creates SharedObject with data
    >>> so = SharedObject("test", data=np.ones((480, 848, 3)))
    # Mounts existing SharedObject
    >>> so = SharedObject("test")
    # Creates a trigger SharedObject (data is None, can be used for joining processes)
    >>> so = SharedObject("trigger")
    >>> so.trigger()  # trigger the SharedObject
    >>> so.triggered  # check if triggered
  * Best practices when fetching np.ndarray (see `SharedObject._fetch_ndarray()`):
    >>> so.fetch(lambda x: x.sum())  # Apply operation only
    >>> so.fetch(lambda x: x + 1)  # Apply operation only
    >>> so.fetch()  # If different operations need to be applied on the same data
    >>> so.fetch()[..., 0]  # Slice only
    >>> so.fetch(lambda x: x[..., 0].copy()) + 1  # Slice and apply operation
"""
# ruff: noqa: UP007

from __future__ import annotations

import struct
import time
from collections.abc import Callable
from multiprocessing.shared_memory import SharedMemory
from typing import Any, Union

import numpy as np

from real_robot import LOGGER

from .shared_object_metas import (
    META_TYPES,
    NP_DTYPES,
    OBJECT_BUF_SIZES,
    BytesMeta,
    NDArrayMeta,
)

try:
    from sapien import Pose  # type: ignore
except ModuleNotFoundError:
    LOGGER.warning("No sapien installed. Will not support sapien.Pose")

[docs] class Pose: def __init__(self) -> None: pass def __setstate__(self, state) -> None: pass def __getstate__(self) -> tuple: return ()
try: import fcntl except ModuleNotFoundError as e: LOGGER.critical("Not supported on Windows: failed to import fcntl") raise e _encoding = "utf8"
[docs] class ReadersLock: """lock.acquire() / .release() is slightly faster than using as a contextmanager""" def __init__(self, fd): self.fd = fd
[docs] def acquire(self): fcntl.flock(self.fd, fcntl.LOCK_SH)
[docs] def release(self): fcntl.flock(self.fd, fcntl.LOCK_UN)
def __enter__(self): self.acquire() return self def __exit__(self, exc_type, exc_value, traceback): self.release()
[docs] class WriterLock: """lock.acquire() / .release() is slightly faster than using as a contextmanager""" def __init__(self, fd): self.fd = fd
[docs] def acquire(self): fcntl.flock(self.fd, fcntl.LOCK_EX)
[docs] def release(self): fcntl.flock(self.fd, fcntl.LOCK_UN)
def __enter__(self): self.acquire() return self def __exit__(self, exc_type, exc_value, traceback): self.release()
[docs] class SharedObject: """ Shared object implemented with SharedMemory and synchronization. SharedMemory reallocation, casting object_type, changing numpy metas are not allowed Use SharedDynamicObject instead The shared memory buffer is organized as follows: - 8 bytes: object modified timestamp in ns (since the epoch), stored as 'Q' - 1 byte: object data type index, stored as 'B' - X bytes: data area. For `NoneType`, data area is ignored. For `bool`, 1 byte data. For `int` / `float`, 8 bytes data. For `complex`, 16 bytes data. For `sapien.Pose`, 7*4 = 28 bytes data ([xyz, wxyz], float32). For `str` / `bytes` / `bytearray`, (8 + N + 1) bytes data. - 8 bytes: size of the string / bytes / bytearray buffer (N + 1) - N bytes: data buffer - 1 byte: termination byte (b"\xff") - padded zero bytes until length indicated in the first 8 bytes. For `np.ndarray`, - 1 byte: array dtype index, stored as 'B' - 8 bytes: array ndim, stored as 'Q' - (K * 8) bytes: array shape for each dimension, stored as 'Q' - D bytes: array data buffer For compound data types (`tuple` / `list` / `set` / `dict`), the buffer is organized as follows: - """ # object_type_idx: _object_types = ( None.__class__, # 0 bool, # 1 int, # 2 float, # 3 complex, # 4 Pose, # 5 str, # 6 bytes, # 7 bytearray, # 8 np.ndarray, # 9 # tuple, # 10 # list, # 11 # set, # 12 dict, # 13 ) @staticmethod def _fetch_metas(shm: SharedMemory) -> tuple[int, int, META_TYPES, int]: nbytes = shm._size # type: ignore mtime, object_type_idx = struct.unpack_from("QB", shm.buf, offset=0) metadata = None if 6 <= object_type_idx <= 8: # str / bytes / bytearray metadata = BytesMeta.from_buf(shm.buf) elif object_type_idx == 9: # np.ndarray metadata = NDArrayMeta.from_buf(shm.buf) elif object_type_idx == 10: # dict metadata = DictMeta.from_buf(shm.buf) return object_type_idx, nbytes, metadata, mtime _fetch_fn_type = "Callable[[Union[_object_types]], Any] | None" @staticmethod def _fetch_None( buf: memoryview, fn: Callable[[None.__class__], Any] | None, *args, offset: int = 9, ) -> Any: return None if fn is None else fn(None) @staticmethod def _fetch_bool( buf: memoryview, fn: Callable[[bool], Any] | None, *args, offset: int = 9 ) -> Any: return bool(buf[offset]) if fn is None else fn(bool(buf[offset])) @staticmethod def _fetch_int( buf: memoryview, fn: Callable[[int], Any] | None, *args, offset: int = 9 ) -> Any: v = struct.unpack_from("q", buf, offset=offset)[0] return v if fn is None else fn(v) @staticmethod def _fetch_float( buf: memoryview, fn: Callable[[float], Any] | None, *args, offset: int = 9 ) -> Any: v = struct.unpack_from("d", buf, offset=offset)[0] return v if fn is None else fn(v) @staticmethod def _fetch_complex( buf: memoryview, fn: Callable[[complex], Any] | None, *args, offset: int = 9 ) -> Any: v = complex(*struct.unpack_from("2d", buf, offset=offset)) return v if fn is None else fn(v) @staticmethod def _fetch_pose( buf: memoryview, fn: Callable[[Pose], Any] | None, *args, offset: int = 9 ) -> Any: """Fetch and construct a sapien.Pose (using __setstate__)""" pose = Pose.__new__(Pose) pose.__setstate__(struct.unpack_from("7f", buf, offset=offset)) return pose if fn is None else fn(pose) @staticmethod def _fetch_str( buf: memoryview, fn: Callable[[str], Any] | None, *args, offset: int = 9 ) -> Any: # TODO: does it need to unpack buf_size data_buf_size = struct.unpack_from("Q", buf, offset=offset)[0] # (N + 1) bytes v = ( buf[offset + 8 : offset + 8 + data_buf_size] .tobytes() .rstrip(b"\x00")[:-1] .decode(_encoding) ) return v if fn is None else fn(v) @staticmethod def _fetch_bytes( buf: memoryview, fn: Callable[[bytes], Any] | None, *args, offset: int = 9 ) -> Any: data_buf_size = struct.unpack_from("Q", buf, offset=offset)[0] # (N + 1) bytes v = buf[offset + 8 : offset + 8 + data_buf_size].tobytes().rstrip(b"\x00")[:-1] return v if fn is None else fn(v) @staticmethod def _fetch_bytearray( buf: memoryview, fn: Callable[[bytearray], Any] | None, *args, offset: int = 9 ) -> Any: data_buf_size = struct.unpack_from("Q", buf, offset=offset)[0] # (N + 1) bytes v = bytearray( buf[offset + 8 : offset + 8 + data_buf_size].tobytes().rstrip(b"\x00")[:-1] ) return v if fn is None else fn(v) @staticmethod def _fetch_ndarray( buf: memoryview, fn: Callable[[np.ndarray], Any] | None, data_buf_ro: np.ndarray, *args, offset: int = 9, ) -> Any: """ Always return a copy of the underlying buffer Examples (ordered from fastest to slowest, benchmarked with 480x848x3 np.uint8): # Apply operation only so.fetch(lambda x: x.sum()) # contiguous sum (triggers a copy) so.fetch().sum() # contiguous copy => sum # Apply operation only so.fetch(lambda x: x + 1) # contiguous add (triggers a copy) so.fetch() + 1 # contiguous copy => add # Slice only (results might vary depending on array size) so.fetch()[..., 0] # contiguous copy => slice so.fetch(lambda x: x[..., 0].copy()) # non-contiguous copy # Slice and apply operation (results might vary depending on array size) so.fetch(lambda x: x[..., 0].copy()) + 1 # non-contiguous copy => add so.fetch(lambda x: x[..., 0] + 1) # non-contiguous add (triggers a copy) so.fetch()[..., 0] + 1 # contiguous copy => non-contiguous add """ if fn is not None: data = fn(data_buf_ro) if not isinstance(data, np.ndarray) or data.flags.owndata: return data else: LOGGER.warning( "Fetching ndarray with fn that does not trigger a copy " "induces an extra copy. Consider changing to improve performance." ) return data.copy() else: return data_buf_ro.copy() @staticmethod def _fetch_dict( buf: memoryview, fn: Callable[[dict], Any] | None, data_buf_ro: np.ndarray, metadata: DictMeta, offset: int = 9, ) -> Any: data = {} offset += 8 # dict, buf_size for (key_obj_type_idx, key_meta), (value_obj_type_idx, value_meta) in zip( metadata.keys_metas, metadata.values_metas ): key = SharedObject._fetch_objects[key_obj_type_idx]( buf, None, offset=offset + 1 ) if key_meta: offset += key_meta.buf_size - 8 # mtime else: offset += OBJECT_BUF_SIZES[key_obj_type_idx] - 8 # mtime value = SharedObject._fetch_objects[value_obj_type_idx]( buf, None, data_buf_ro, offset=offset + 1 ) if value_meta: offset += value_meta.buf_size - 8 # mtime else: offset += OBJECT_BUF_SIZES[value_obj_type_idx] - 8 # mtime data[key] = value return data if fn is None else fn(data) _fetch_objects = ( _fetch_None.__func__, # type: ignore _fetch_bool.__func__, # type: ignore _fetch_int.__func__, # type: ignore _fetch_float.__func__, # type: ignore _fetch_complex.__func__, # type: ignore _fetch_pose.__func__, # type: ignore _fetch_str.__func__, # type: ignore _fetch_bytes.__func__, # type: ignore _fetch_bytearray.__func__, # type: ignore _fetch_ndarray.__func__, # type: ignore _fetch_dict.__func__, # type: ignore ) @staticmethod def _assign_None(*args, offset: int = 9): pass @staticmethod def _assign_bool(buf: memoryview, data: bool, *args, offset: int = 9): buf[offset] = data @staticmethod def _assign_int(buf: memoryview, data: int, *args, offset: int = 9): struct.pack_into("q", buf, offset, data) @staticmethod def _assign_float(buf: memoryview, data: float, *args, offset: int = 9): struct.pack_into("d", buf, offset, data) @staticmethod def _assign_complex(buf: memoryview, data: complex, *args, offset: int = 9): struct.pack_into("2d", buf, offset, data.real, data.imag) @staticmethod def _assign_pose(buf: memoryview, pose: Pose, *args, offset: int = 9): struct.pack_into("7f", buf, offset, *pose.__getstate__()) @staticmethod def _assign_bytes( buf: memoryview, enc_data: bytes, metadata: BytesMeta, *args, offset: int = 9 ): """ :param metadata: metadata.data_buf_size is the bytes buffer size (w/ termination byte) (i.e., N + 1) """ struct.pack_into( f"{metadata.data_buf_size}s", buf, offset + 8, enc_data + b"\xff" ) @staticmethod def _assign_ndarray( buf: memoryview, data: np.ndarray, metadata: NDArrayMeta, data_buf: np.ndarray, offset: int = 9, ): data_buf[:] = data @staticmethod def _assign_dict( buf: memoryview, data: dict, metadata: DictMeta, np_data_buf: np.ndarray, offset: int = 9, ): offset += 8 # dict, buf_size for (key, value), (key_obj_type_idx, key_meta), ( value_obj_type_idx, value_meta, ) in zip(data.items(), metadata.keys_metas, metadata.values_metas): # FIXME: Do not encode again if isinstance(key, str): key = key.encode(_encoding) # encode strings into bytes SharedObject._assign_objects[key_obj_type_idx]( buf, key, key_meta, offset=offset + 1 ) if key_meta: offset += key_meta.buf_size - 8 # mtime else: offset += OBJECT_BUF_SIZES[key_obj_type_idx] - 8 # mtime # FIXME: Do not encode again if isinstance(value, str): value = value.encode(_encoding) # encode strings into bytes SharedObject._assign_objects[value_obj_type_idx]( buf, value, value_meta, np_data_buf, offset=offset + 1 ) if value_meta: offset += value_meta.buf_size - 8 # mtime else: offset += OBJECT_BUF_SIZES[value_obj_type_idx] - 8 # mtime _assign_objects = ( _assign_None.__func__, # type: ignore _assign_bool.__func__, # type: ignore _assign_int.__func__, # type: ignore _assign_float.__func__, # type: ignore _assign_complex.__func__, # type: ignore _assign_pose.__func__, # type: ignore _assign_bytes.__func__, # type: ignore _assign_bytes.__func__, # type: ignore _assign_bytes.__func__, # type: ignore _assign_ndarray.__func__, # type: ignore _assign_dict.__func__, # type: ignore ) def __init__(self, name: str, *, data: Union[_object_types] = None, init_size=100): # type: ignore """ Examples: # Mounts SharedMemory "test" if exists, # Else creates SharedMemory "test" which holds None by default so = SharedObject("test") # Mounts SharedMemory "test" if exists and assign data (True) to it, # Else creates SharedMemory "test" and assigns data so = SharedObject("test", data=True) # Mounts SharedMemory "test" if exists and assign data (np.ones(10)) to it, # Else creates SharedMemory "test" and assigns data so = SharedObject("test", data=np.ones(10)) :param init_size: only used for str and bytes, initial buffer size to save frequent reallocation. The buffer is expanded with exponential growth rate of 2 """ self.init_size = init_size data, object_type_idx, nbytes, metadata = self._preprocess_data(data) try: self.shm = SharedMemory(name) created = False except FileNotFoundError: # no SharedMemory with given name self.shm = SharedMemory(name, create=True, size=nbytes) created = True self.name = name self._readers_lock = ReadersLock(self.shm._fd) # type: ignore self._writer_lock = WriterLock(self.shm._fd) # type: ignore if created: self.object_type_idx = object_type_idx self.nbytes = nbytes self.metadata = metadata self.mtime = time.time_ns() # Assign object_type, np_metas to init object meta info self._writer_lock.acquire() self.shm.buf[8] = object_type_idx if metadata is not None: metadata.assign_buf(self.shm.buf) self._writer_lock.release() else: self._readers_lock.acquire() self.object_type_idx, self.nbytes, self.metadata, self.mtime = ( self._fetch_metas(self.shm) ) self._readers_lock.release() # Create np.ndarray here to save frequent np.ndarray construction self.np_ndarray, self.np_ndarray_ro = None, None if self.object_type_idx == 9: # np.ndarray self.np_ndarray = np.ndarray( self.metadata.shape, # type: ignore dtype=NP_DTYPES[self.metadata.dtype_idx], # type: ignore buffer=self.shm.buf, offset=self.metadata.ndim * 8 + 18, # type: ignore ) # Create a read-only view for fetch() self.np_ndarray_ro = self.np_ndarray.view() self.np_ndarray_ro.setflags(write=False) elif ( self.object_type_idx == 10 and self.metadata.ndarray_meta # type: ignore ): # dict self.np_ndarray = np.ndarray( self.metadata.ndarray_meta.shape, # type: ignore dtype=NP_DTYPES[self.metadata.ndarray_meta.dtype_idx], # type: ignore buffer=self.shm.buf, offset=self.metadata.ndarray_offset, # type: ignore ) # Create a read-only view for fetch() self.np_ndarray_ro = self.np_ndarray.view() self.np_ndarray_ro.setflags(write=False) # fill data if data is not None: if not created: LOGGER.warning("Implicitly overwriting data of {!r}", self) self._assign(data, object_type_idx, nbytes, metadata) def _preprocess_data( self, data: Union[_object_types], # type: ignore ) -> tuple[Union[_object_types], int, int, META_TYPES]: # type: ignore """Preprocess object data and return useful informations :return data: processed data. Only changed for str (=> encoded bytes) :return object_type_idx: object type index :return nbytes: number of bytes needed for SharedMemory :return metadata: metadata info """ try: object_type_idx = self._object_types.index(type(data)) except ValueError as e: raise TypeError(f"Not supported object_type: {type(data)}") from e # Get shared memory size in bytes metadata = None if object_type_idx <= 5: # NoneType, bool, int, float, complex, sapien.Pose nbytes = OBJECT_BUF_SIZES[object_type_idx] elif object_type_idx == 6: # str data = data.encode(_encoding) # encode strings into bytes # TODO: Change self.init_size to str # TODO: Allow filling up to the full data_buf_size w/o expanding on assign() metadata = BytesMeta.from_data(data, self.init_size) nbytes = metadata.buf_size elif 7 <= object_type_idx <= 8: # bytes, bytearray metadata = BytesMeta.from_data(data, self.init_size) nbytes = metadata.buf_size elif object_type_idx == 9: # np.ndarray metadata = NDArrayMeta.from_data(data) nbytes = metadata.buf_size elif object_type_idx == 10: # dict metadata = DictMeta.from_data(data, self.init_size) nbytes = metadata.buf_size else: raise ValueError(f"Unknown {object_type_idx = }") return data, object_type_idx, nbytes, metadata @property def modified(self) -> bool: """Returns whether the object's data has been modified by another process. Check by fetching object modified timestamp and comparing with self.mtime """ self._readers_lock.acquire() mtime = struct.unpack_from("Q", self.shm.buf, offset=0)[0] self._readers_lock.release() return mtime > self.mtime @property def triggered(self) -> bool: """Returns whether the object is triggered (protected by readers lock) Check by fetching object modified timestamp, comparing with self.mtime and updating self.mtime """ self._readers_lock.acquire() mtime = struct.unpack_from("Q", self.shm.buf, offset=0)[0] self._readers_lock.release() modified = mtime > self.mtime self.mtime = mtime return modified
[docs] def fetch(self, fn: _fetch_fn_type = None) -> Any: # type: ignore """ Fetch a copy of data from SharedMemory (protected by readers lock) See SharedObject._fetch_ndarray() for best practices of fn with np.ndarray :param fn: function to apply on data, e.g., lambda x: x + 1. If fn is None or does not trigger a copy for ndarray (e.g., slicing, masking), a manual copy is applied. Thus, the best practices are ordered as: fn (triggers a copy) > fn = None >> fn (does not trigger a copy) because copying non-contiguous ndarray takes much longer time. :return data: a copy of data read from SharedMemory """ self._readers_lock.acquire() # Update modified timestamp self.mtime = struct.unpack_from("Q", self.shm.buf, offset=0)[0] data = self._fetch_objects[self.object_type_idx]( self.shm.buf, fn, self.np_ndarray_ro, self.metadata ) self._readers_lock.release() return data
[docs] def trigger(self) -> SharedObject: """Trigger by modifying object mtime (protected by writer lock)""" self._writer_lock.acquire() # Update mtime struct.pack_into("Q", self.shm.buf, 0, time.time_ns()) self._writer_lock.release() return self
[docs] def assign(self, data: Union[_object_types]) -> SharedObject: # type: ignore """Assign data to SharedMemory (protected by writer lock)""" return self._assign(*self._preprocess_data(data))
def _assign( self, data, object_type_idx: int, nbytes: int, metadata: META_TYPES ) -> SharedObject: """Inner function for assigning data (protected by writer lock) For SharedObject, object_type_idx, nbytes, and np_metas cannot be modified """ if ( object_type_idx != self.object_type_idx or nbytes > self.nbytes or metadata != self.metadata ): raise BufferError( f"Casting object type (new={self._object_types[object_type_idx]}, " f"old={self._object_types[self.object_type_idx]}) OR " f"Buffer overflow (new={nbytes} > {self.nbytes}=old) OR " f"Changed metadata (new={metadata}, old={self.metadata}) in {self!r}" ) self._writer_lock.acquire() # Assign mtime self.mtime = time.time_ns() struct.pack_into("Q", self.shm.buf, 0, self.mtime) # Assign object data self._assign_objects[self.object_type_idx]( self.shm.buf, data, self.metadata, self.np_ndarray ) self._writer_lock.release() return self
[docs] def close(self): """Closes access to the shared memory from this instance but does not destroy the shared memory block.""" self.shm.close()
def __del__(self): self.close() def __reduce__(self): return self.__class__, (self.name,) def __repr__(self): return ( f"<{self.__class__.__name__}: name={self.name}, " f"data_type={self._object_types[self.object_type_idx]}, " f"nbytes={self.nbytes}>" )
[docs] class SharedDynamicObject(SharedObject): """ Shared object implemented with SharedMemory and synchronization Allow reallocating SharedMemory. Need more checks and thus is slower than SharedObject. In fact, this should never be implemented. For size-variable np.ndarray, just implement similar support as str/bytes """ @staticmethod def _fetch_metas(shm: SharedMemory) -> tuple[int, int, META_TYPES, int]: nbytes = ( shm._mmap.size() # type: ignore ) # _mmap size will be updated by os.ftruncate() mtime, object_type_idx = struct.unpack_from("QB", shm.buf, offset=0) metadata = None if 6 <= object_type_idx <= 8: # str / bytes / bytearray metadata = BytesMeta.from_buf(shm.buf) elif object_type_idx == 9: # np.ndarray metadata = NDArrayMeta.from_buf(shm.buf) elif object_type_idx == 10: # dict metadata = DictMeta.from_buf(shm.buf) return object_type_idx, nbytes, metadata, mtime def __init__(self, *args, **kwargs): raise NotImplementedError("Implementation not complete")
[docs] def fetch(self, fn: SharedObject._fetch_fn_type = None) -> Any: # type: ignore """Fetch a copy of data from SharedMemory (protected by readers lock) :param fn: function to apply on data, e.g., lambda x: x + 1. :return data: a copy of data read from SharedMemory """ self._readers_lock.acquire() # Fetch shm info self.object_type_idx, self.nbytes, self.metadata, self.mtime = ( self._fetch_metas(self.shm) ) data = self._fetch_objects[self.object_type_idx]( self.shm.buf, fn, self.np_ndarray_ro ) self._readers_lock.release() return data
[docs] def assign(self, data: Union[SharedObject._object_types], reallocate=False) -> None: # type: ignore """Assign data to SharedMemory (protected by writer lock) :param reallocate: whether to force reallocation """ # Check object type data, object_type_idx, nbytes, metadata = self._preprocess_data(data) # noqa: F841 self._writer_lock.acquire() # Fetch shm info self.object_type_idx, self.nbytes, self.metadata, self.mtime = ( self._fetch_metas(self.shm) ) # Reallocate if necessary if reallocate or nbytes > self.nbytes or metadata != self.metadata: # NOTE: Cannot use unlink() to reallocate SharedMemory # Otherwise, existing SharedObject instances to the same SharedMemory # will not be updated # Need to use os.ftruncate(sm._fd, new_size) raise NotImplementedError("reallocate is not yet implemented") self._assign_objects[self.object_type_idx]( self.shm.buf, data, self.nbytes, self.np_ndarray ) self._writer_lock.release()