shared_object¶
Shared object implemented with SharedMemory and synchronization Careful optimization is done to make it run as fast as possible version 0.0.6
Written by Kolin Guo
- Implementation Notes:
Methods fetch() and assign() are chosen to be distinctive from common python class methods (e.g., get(), set(), update(), read(), write(), fill(), put(), etc.)
Readers-Writer synchronization is achieved by fcntl.flock() (filesystem advisory lock). It’s chosen over multiprocessing.Lock / multiprocessing.Condition so that no lock needs to be explicitly passed to child processes. However, note that acquiring flock locks are not guaranteed to be in order.
An object modified timestamp is maintained using time.time_ns() which is system-wide and has highest resolution. (https://peps.python.org/pep-0564/#linux)
- Usage Notes:
For processes that are always waiting for a massive SharedObject (e.g., np.ndarray), it’s best to use so.modified to check whether the data has been updated yet to avoid starving processes that are assigning to it (only fetch when so.modified is True). Alternatively, a separate boolean update flag can be used to achieve the same.
Examples: # Creates SharedObject with data >>> so = SharedObject(“test”, data=np.ones((480, 848, 3))) # Mounts existing SharedObject >>> so = SharedObject(“test”) # Creates a trigger SharedObject (data is None, can be used for joining processes) >>> so = SharedObject(“trigger”) >>> so.trigger() # trigger the SharedObject >>> so.triggered # check if triggered
Best practices when fetching np.ndarray (see SharedObject._fetch_ndarray()): >>> so.fetch(lambda x: x.sum()) # Apply operation only >>> so.fetch(lambda x: x + 1) # Apply operation only >>> so.fetch() # If different operations need to be applied on the same data >>> so.fetch()[…, 0] # Slice only >>> so.fetch(lambda x: x[…, 0].copy()) + 1 # Slice and apply operation
- class real_robot.utils.multiprocessing.shared_object.ReadersLock(fd)[source]¶
Bases:
objectlock.acquire() / .release() is slightly faster than using as a contextmanager
- class real_robot.utils.multiprocessing.shared_object.SharedDynamicObject(*args, **kwargs)[source]¶
Bases:
SharedObjectShared object implemented with SharedMemory and synchronization Allow reallocating SharedMemory.
Need more checks and thus is slower than SharedObject. In fact, this should never be implemented. For size-variable np.ndarray, just implement similar support as str/bytes
- class real_robot.utils.multiprocessing.shared_object.SharedObject(name: str, *, data: _object_types = None, init_size=100)[source]¶
Bases:
objectShared object implemented with SharedMemory and synchronization. SharedMemory reallocation, casting object_type, changing numpy metas are not allowed Use SharedDynamicObject instead
The shared memory buffer is organized as follows:
8 bytes: object modified timestamp in ns (since the epoch), stored as ‘Q’
1 byte: object data type index, stored as ‘B’
X bytes: data area.
For NoneType, data area is ignored. For bool, 1 byte data. For int / float, 8 bytes data. For complex, 16 bytes data. For sapien.Pose, 7*4 = 28 bytes data ([xyz, wxyz], float32). For str / bytes / bytearray, (8 + N + 1) bytes data.
8 bytes: size of the string / bytes / bytearray buffer (N + 1)
N bytes: data buffer
1 byte: termination byte (b”ÿ”)
padded zero bytes until length indicated in the first 8 bytes.
For np.ndarray,
1 byte: array dtype index, stored as ‘B’
8 bytes: array ndim, stored as ‘Q’
(K * 8) bytes: array shape for each dimension, stored as ‘Q’
D bytes: array data buffer
For compound data types (tuple / list / set / dict), the buffer is organized as follows:
- assign(data: _object_types) → SharedObject[source]¶
Assign data to SharedMemory (protected by writer lock)
- close()[source]¶
Closes access to the shared memory from this instance but does not destroy the shared memory block.
- fetch(fn: _fetch_fn_type = None) → Any[source]¶
Fetch a copy of data from SharedMemory (protected by readers lock) See SharedObject._fetch_ndarray() for best practices of fn with np.ndarray
- Parameters:
fn – function to apply on data, e.g., lambda x: x + 1. If fn is None or does not trigger a copy for ndarray (e.g., slicing, masking), a manual copy is applied. Thus, the best practices are ordered as: fn (triggers a copy) > fn = None >> fn (does not trigger a copy) because copying non-contiguous ndarray takes much longer time.
- Return data:
a copy of data read from SharedMemory
- property modified: bool¶
Returns whether the object’s data has been modified by another process. Check by fetching object modified timestamp and comparing with self.mtime
- trigger() → SharedObject[source]¶
Trigger by modifying object mtime (protected by writer lock)