shared_object

Shared object implemented with SharedMemory and synchronization Careful optimization is done to make it run as fast as possible version 0.0.6

Written by Kolin Guo

Implementation Notes:
  • Methods fetch() and assign() are chosen to be distinctive from common python class methods (e.g., get(), set(), update(), read(), write(), fill(), put(), etc.)

  • Readers-Writer synchronization is achieved by fcntl.flock() (filesystem advisory lock). It’s chosen over multiprocessing.Lock / multiprocessing.Condition so that no lock needs to be explicitly passed to child processes. However, note that acquiring flock locks are not guaranteed to be in order.

  • An object modified timestamp is maintained using time.time_ns() which is system-wide and has highest resolution. (https://peps.python.org/pep-0564/#linux)

Usage Notes:
  • For processes that are always waiting for a massive SharedObject (e.g., np.ndarray), it’s best to use so.modified to check whether the data has been updated yet to avoid starving processes that are assigning to it (only fetch when so.modified is True). Alternatively, a separate boolean update flag can be used to achieve the same.

  • Examples: # Creates SharedObject with data >>> so = SharedObject(“test”, data=np.ones((480, 848, 3))) # Mounts existing SharedObject >>> so = SharedObject(“test”) # Creates a trigger SharedObject (data is None, can be used for joining processes) >>> so = SharedObject(“trigger”) >>> so.trigger() # trigger the SharedObject >>> so.triggered # check if triggered

  • Best practices when fetching np.ndarray (see SharedObject._fetch_ndarray()): >>> so.fetch(lambda x: x.sum()) # Apply operation only >>> so.fetch(lambda x: x + 1) # Apply operation only >>> so.fetch() # If different operations need to be applied on the same data >>> so.fetch()[…, 0] # Slice only >>> so.fetch(lambda x: x[…, 0].copy()) + 1 # Slice and apply operation

class real_robot.utils.multiprocessing.shared_object.Pose[source]

Bases: object

class real_robot.utils.multiprocessing.shared_object.ReadersLock(fd)[source]

Bases: object

lock.acquire() / .release() is slightly faster than using as a contextmanager

acquire()[source]
release()[source]
class real_robot.utils.multiprocessing.shared_object.SharedDynamicObject(*args, **kwargs)[source]

Bases: SharedObject

Shared object implemented with SharedMemory and synchronization Allow reallocating SharedMemory.

Need more checks and thus is slower than SharedObject. In fact, this should never be implemented. For size-variable np.ndarray, just implement similar support as str/bytes

assign(data: None | bool | int | float | complex | Pose | str | bytes | bytearray | ndarray | dict, reallocate=False) None[source]

Assign data to SharedMemory (protected by writer lock) :param reallocate: whether to force reallocation

fetch(fn: SharedObject._fetch_fn_type = None) Any[source]

Fetch a copy of data from SharedMemory (protected by readers lock) :param fn: function to apply on data, e.g., lambda x: x + 1. :return data: a copy of data read from SharedMemory

class real_robot.utils.multiprocessing.shared_object.SharedObject(name: str, *, data: _object_types = None, init_size=100)[source]

Bases: object

Shared object implemented with SharedMemory and synchronization. SharedMemory reallocation, casting object_type, changing numpy metas are not allowed Use SharedDynamicObject instead

The shared memory buffer is organized as follows:

  • 8 bytes: object modified timestamp in ns (since the epoch), stored as ‘Q’

  • 1 byte: object data type index, stored as ‘B’

  • X bytes: data area.

    For NoneType, data area is ignored. For bool, 1 byte data. For int / float, 8 bytes data. For complex, 16 bytes data. For sapien.Pose, 7*4 = 28 bytes data ([xyz, wxyz], float32). For str / bytes / bytearray, (8 + N + 1) bytes data.

    • 8 bytes: size of the string / bytes / bytearray buffer (N + 1)

    • N bytes: data buffer

    • 1 byte: termination byte (b”ÿ”)

    • padded zero bytes until length indicated in the first 8 bytes.

    For np.ndarray,

    • 1 byte: array dtype index, stored as ‘B’

    • 8 bytes: array ndim, stored as ‘Q’

    • (K * 8) bytes: array shape for each dimension, stored as ‘Q’

    • D bytes: array data buffer

    For compound data types (tuple / list / set / dict), the buffer is organized as follows:

assign(data: _object_types) SharedObject[source]

Assign data to SharedMemory (protected by writer lock)

close()[source]

Closes access to the shared memory from this instance but does not destroy the shared memory block.

fetch(fn: _fetch_fn_type = None) Any[source]

Fetch a copy of data from SharedMemory (protected by readers lock) See SharedObject._fetch_ndarray() for best practices of fn with np.ndarray

Parameters:

fn – function to apply on data, e.g., lambda x: x + 1. If fn is None or does not trigger a copy for ndarray (e.g., slicing, masking), a manual copy is applied. Thus, the best practices are ordered as: fn (triggers a copy) > fn = None >> fn (does not trigger a copy) because copying non-contiguous ndarray takes much longer time.

Return data:

a copy of data read from SharedMemory

property modified: bool

Returns whether the object’s data has been modified by another process. Check by fetching object modified timestamp and comparing with self.mtime

trigger() SharedObject[source]

Trigger by modifying object mtime (protected by writer lock)

property triggered: bool

Returns whether the object is triggered (protected by readers lock) Check by fetching object modified timestamp, comparing with self.mtime and updating self.mtime

Requests that the underlying shared memory block be destroyed.

In order to ensure proper cleanup of resources, unlink should be called once (and only once) across all processes which have access to the shared memory block.

class real_robot.utils.multiprocessing.shared_object.WriterLock(fd)[source]

Bases: object

lock.acquire() / .release() is slightly faster than using as a contextmanager

acquire()[source]
release()[source]