Source code for real_robot.utils.visualization.cv2_visualizer

from __future__ import annotations

import functools
import math
import os
import time
from collections import defaultdict
from collections.abc import Callable
from enum import Enum
from typing import Any, Optional

import cv2
import numpy as np

from ..logger import get_logger
from ..multiprocessing import (
    SharedObject,
    SharedObjectDefaultDict,
    signal_process_ready,
)
from .utils import colorize_mask, draw_mask


[docs] class CV2Visualizer: """OpenCV visualizer for RGB and depth images, fps is updated in window title""" images: list[np.ndarray] = [] # images being visualized # images boundaries [[[x_min, x_max+1], [y_min, y_max+1]]] image_boundaries: np.ndarray = np.empty((0, 2, 2), dtype=np.floating)
[docs] class DrawingMode(Enum): """Drawing mode when draw_with_mouse()""" Box = 0 Point = 1
# Attributes for drawing points / bbox _done_drawing: bool = True # whether drawing with mouse is done _in_drawing: bool = False # whether currently in drawing selected_image_idx: int | None = None # previously selected image during drawing _image: np.ndarray | None = None # the image currently in drawing _extra_ret_data: Any = None # extra data returned from _update_drawing_fn() _mouse_pos: tuple[int, int] = (-1, -1) # mouse position when selecting image _CTRLKEY: int = 0 # whether CTRL key is pressed _drawing_mode: DrawingMode = DrawingMode.Box # Drawing mode points: np.ndarray = np.empty((0, 2), dtype=int) # Drawn points: [x, y] point_labels: np.ndarray = np.empty(0, dtype=int) # Drawn point labels: (0, 1) boxes: np.ndarray = np.empty((0, 4), dtype=int) # Drawn boxes XYXY coordinates box_labels: np.ndarray = np.empty(0, dtype=int) # Drawn box labels: (0, 1) def __init__( self, window_name="Images", *, update_drawing_fn: Optional[ Callable[[np.ndarray, dict[str, np.ndarray]], tuple[np.ndarray, Any]] ] = None, run_as_process=False, stream_camera=False, ): """ :param window_name: window name :param update_drawing_fn: callback function with drawn points/boxes during drawing. Inputs to this function are an RGB image and a dictionary of {"points": np.ndarray, "point_labels": np.ndarray, "boxes": np.ndarray, "box_labels": np.ndarray} Outputs are the modified image and any additional data. :param run_as_process: whether to run CV2Visualizer as a separate process. If True, CV2Visualizer needs to be created as a `mp.Process`. Several SharedObject are mounted to control CV2Visualizer and fetch data: Only "join_viscv2" is created by this process. * "join_viscv2": If triggered, the CV2Visualizer process is joined. * "draw_vis": If triggered, redraw the images. * "reset_vis": If triggered, call self.clear_image(). * "sync_rs_<device_uid>": If triggered, capture from RSDevice. Corresponding data have the same prefix (implemented as sorting) * Data unique to CV2Visualizer have prefix "viscv2_<image_uid>_" * Data shared with O3DGUIVisualizer have prefix "vis_<image_uid>_" * RSDevice camera feeds have prefix "rs_<device_uid>_" * "rs_<device_uid>_color": rgb color image, [H, W, 3] np.uint8 np.ndarray * "rs_<device_uid>_depth": depth image, [H, W] np.uint16 np.ndarray * "rs_<device_uid>_mask": object mask, [H, W] bool/np.uint8 np.ndarray Acceptable visualization SharedObject data formats and suffixes: * "_color": RGB color images, [H, W, 3] np.uint8 np.ndarray * "_depth": Depth images, [H, W] or [H, W, 1] np.uint16/np.floating np.ndarray * "_mask": Object mask images, [H, W] bool/np.uint8 np.ndarray :param stream_camera: whether to redraw camera stream when a new frame arrives """ self.logger = get_logger("CV2Visualizer") self.window_name = window_name self.last_timestamp_ns = time.time_ns() cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) cv2.setMouseCallback(window_name, self.on_mouse) cv2.displayOverlay(self.window_name, "Press 'd' to go into drawing mode", 5000) if update_drawing_fn is not None: self._update_drawing_fn = update_drawing_fn if run_as_process: self.stream_camera = stream_camera self.run_as_process()
[docs] @staticmethod def preprocess_image(image: np.ndarray, depth_scale=1000.0) -> np.ndarray: """Preprocess image for plotting with cv2 (RGB2BGR, applyColorMap for depth) :param image: depth, grayscale or RGB color image :param depth_scale: used to apply color map on np.floating depth_image :return image: color image in BGR format, [H, W, 3] np.uint8 np.ndarray """ ndim = image.ndim channels = image.shape[-1] dtype = image.dtype if ndim == 3 and channels == 3 and dtype == np.uint8: # rgb return cv2.cvtColor(image, cv2.COLOR_RGB2BGR) elif ndim == 2 or (ndim == 3 and channels == 1): # depth or grayscale # Depth image colormap is taken from # https://github.com/IntelRealSense/librealsense/blob/8ffb17b027e100c2a14fa21f01f97a1921ec1e1b/wrappers/python/examples/opencv_viewer_example.py#L56 if dtype == np.uint16: alpha = 0.03 elif np.issubdtype(dtype, np.floating): alpha = 0.03 * depth_scale elif dtype == np.uint8: return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) else: raise TypeError(f"Unknown depth image dtype: {dtype}") return cv2.applyColorMap( cv2.convertScaleAbs(image, alpha=alpha), cv2.COLORMAP_JET ) else: raise NotImplementedError(f"Unknown image type: {image.shape=} {dtype=}")
[docs] @staticmethod def get_image_layout(n_image: int) -> tuple[int, int]: """Get layout of Images (n_rows, n_cols) where n_rows >= n_cols""" s = math.isqrt(n_image) s_squared = s * s if s_squared == n_image: return s, s elif n_image <= s_squared + s: return s + 1, s else: return s + 1, s + 1
[docs] @staticmethod def resize_image( image: np.ndarray, max_H: int, max_W: int, interpolation=cv2.INTER_NEAREST_EXACT ) -> np.ndarray: """Resize image to the requested size while preserving its aspect ratio""" # background is white new_image = np.full((max_H, max_W, 3), 255, np.uint8) h, w, _ = image.shape if (h_ratio := (max_H / h)) < (w_ratio := (max_W / w)): new_w = math.floor(w * h_ratio) start_i = (max_W - new_w) // 2 new_image[:, start_i : start_i + new_w] = cv2.resize( image, (new_w, max_H), interpolation=interpolation ) return new_image else: new_h = math.floor(h * w_ratio) start_i = (max_H - new_h) // 2 new_image[start_i : start_i + new_h, :] = cv2.resize( image, (max_W, new_h), interpolation=interpolation ) return new_image
[docs] def show_images(self, images: list[np.ndarray]): """Show the list of images, support non-equal size (cv2.rerize to max size) :param images: List of np.ndarray images. Supports depth or RGB color images. If depth image, dtype can be np.uint16 or np.floating If RGB image, dtype must be np.uint8 """ if (n_image := len(images)) == 0: return self.images = images # save input images images = [self.preprocess_image(image) for image in images] # Resize non-equal sized image to max_shape max_shape = images[np.argmax([image.size for image in images])].shape max_H, max_W, _ = max_shape images = [ ( image if image.shape == max_shape else self.resize_image(image, max_H, max_W) ) for image in images ] if n_image == 1: vis_image = images[0] self.image_boundaries = np.asarray([[[0, max_W], [0, max_H]]]) elif n_image < 4: vis_image = np.vstack(images) self.image_boundaries = np.asarray([ [[0, max_W], [max_H * i, max_H * (i + 1)]] for i in range(n_image) ]) else: n_rows, n_cols = self.get_image_layout(n_image) vis_image = np.zeros((max_H * n_rows, max_W * n_cols, 3), dtype=np.uint8) self.image_boundaries = np.empty((0, 2, 2), dtype=int) for r in range(n_rows): for c in range(n_cols): if (idx := n_cols * r + c) >= n_image: break vis_image[ max_H * r : max_H * (r + 1), max_W * c : max_W * (c + 1) ] = images[idx] self.image_boundaries = np.concatenate([ self.image_boundaries, np.asarray([ [[max_W * c, max_W * (c + 1)], [max_H * r, max_H * (r + 1)]] ]), ]) # Add fps to window title (overlay with cv2.putText is slower) cur_timestamp_ns = time.time_ns() fps = 1e9 / (cur_timestamp_ns - self.last_timestamp_ns) self.last_timestamp_ns = cur_timestamp_ns cv2.setWindowTitle( self.window_name, f"{self.window_name} {max_W}x{max_H} @ {fps:6.2f}fps" ) cv2.imshow(self.window_name, vis_image) self.render()
[docs] def run_as_process(self): """Run CV2Visualizer as a separate process""" self.logger.info(f"Running {self!r} as a separate process") # CV2Visualizer control so_joined = SharedObject("join_viscv2") so_draw = SharedObject("draw_vis") so_reset = SharedObject("reset_vis") so_dict = SharedObjectDefaultDict() # {so_name: SharedObject} # {"rs_<device_uid>_color": image} vis_data = defaultdict( functools.partial( np.full, shape=(480, 848, 3), fill_value=255, dtype=np.uint8 ) ) def get_so_data_names(all_so_names: list[str]) -> list[str]: """Get so_data_names acceptable by CV2Visualizer""" valid_names = [ p for p in all_so_names if p.startswith(("rs_", "vis_", "viscv2_")) and p.endswith(( "_color", "_depth", "_infrared_1", "_infrared_2", "_mask", )) ] if len(valid_names) == 0: self.logger.warning( "No valid shm data names found under /dev/shm. The shm filenames " "must have prefix in ['rs_', 'vis_', 'viscv2_'] and " "suffix in ['_color', '_depth', '_mask']" ) return valid_names rs_so_name_suffix = ("_color", "_depth", "_infrared_1", "_infrared_2") signal_process_ready() # current process is ready while not so_joined.triggered: # Sort names so they are ordered as color, depth, mask all_so_names = sorted(os.listdir("/dev/shm")) # ----- Reset ----- # if so_reset.triggered: # triggers reset self.clear_image() so_dict = SharedObjectDefaultDict() # {so_name: SharedObject} # {"rs_<device_uid>_color": image} vis_data = defaultdict( functools.partial( np.full, shape=(480, 848, 3), fill_value=255, dtype=np.uint8 ) ) # ----- Capture and update from RSDevice stream ----- # if self.stream_camera: # capture whenever a new frame comes in updated = False for so_data_name in [ p for p in all_so_names if p.startswith("rs_") and p.endswith(rs_so_name_suffix) ]: if (so_data := so_dict[so_data_name]).modified: vis_data[so_data_name] = so_data.fetch() updated = True if updated: # camera stream is updated, redraw images images = [] for so_data_name in get_so_data_names(all_so_names): if so_data_name.endswith("_mask"): images += [ vis_data[f"{so_data_name}_overlay"], vis_data[f"{so_data_name}_colorized"], ] else: images.append(vis_data[so_data_name]) self.show_images(images) else: # synchronized capturing with env (no redraw here) # for each camera sync, check if capture is triggered for so_name in [p for p in all_so_names if p.startswith("sync_rs_")]: if so_dict[so_name].triggered: for so_data_name in [ f"{so_name[5:]}{suffix}" for suffix in rs_so_name_suffix ]: if so_data_name in all_so_names: vis_data[so_data_name] = so_dict[so_data_name].fetch() # ----- Fetch data and draw ----- # if so_draw.triggered: # triggers redraw # Sort names so they are ordered as color, depth, mask all_so_names = sorted(os.listdir("/dev/shm")) # get most updated list images = [] for so_data_name in get_so_data_names(all_so_names): # Fetch data or use captured RSDevice data if so_data_name.endswith("_color"): if so_data_name.startswith("rs_"): color_image = image = vis_data[so_data_name] else: vis_data[so_data_name] = color_image = image = so_dict[ so_data_name ].fetch() elif so_data_name.startswith("rs_") and not so_data_name.endswith( "_mask" ): image = vis_data[so_data_name] else: vis_data[so_data_name] = image = so_dict[so_data_name].fetch() # Visualize mask by overlaying to color_image and colorizing mask if so_data_name.endswith("_mask"): vis_data[f"{so_data_name}_overlay"] = mask_overlay = draw_mask( color_image, image ) vis_data[f"{so_data_name}_colorized"] = mask_colorized = ( colorize_mask(image) ) images += [mask_overlay, mask_colorized] else: images.append(image) self.show_images(images) self.render() self.logger.info(f"Process running {self!r} is joined") # Unlink created SharedObject so_joined.unlink()
[docs] def clear_image(self): """Show a black image""" cv2.imshow(self.window_name, np.zeros((128, 128, 3), dtype=np.uint8)) cv2.pollKey()
[docs] def render(self): """Update renderer to show image and respond to mouse and keyboard events""" if cv2.pollKey() == ord("d"): self.draw_with_mouse()
[docs] def draw_with_mouse( self, *, update_drawing_fn: Optional[ Callable[[np.ndarray, dict[str, np.ndarray]], tuple[np.ndarray, Any]] ] = None, ) -> tuple[np.ndarray, dict[str, np.ndarray], Any] | None: """Enter drawing mode with mouse :param update_drawing_fn: function to be called with the image and drawn points/boxes. Inputs to this function are an RGB image and a dictionary of {"points": np.ndarray, "point_labels": np.ndarray, "boxes": np.ndarray, "box_labels": np.ndarray} Outputs are the modified image and any additional data. :return: Original image before drawing, drawn labels dict, _extra_ret_data returned from self._update_drawing_fn() """ self.selected_image_idx = None self._image = None # the image currently in drawing if update_drawing_fn is not None: self._update_drawing_fn = update_drawing_fn self._extra_ret_data = None self._mouse_pos = (-1, -1) self._CTRLKEY = 0 self._drawing_mode = self.DrawingMode.Box self.points = np.empty((0, 2), dtype=int) # Drawn points: [x, y] self.point_labels = np.empty(0, dtype=int) # Drawn point labels: (0, 1) self.boxes = np.empty((0, 4), dtype=int) # Drawn boxes XYXY coordinates self.box_labels = np.empty(0, dtype=int) # Drawn box labels: (0, 1) self._done_drawing = False init_overlay_text = ( "Drawing mode! 's/x' to select/delete image, 'Esc/Enter/Space' to quit" ) cv2.displayOverlay(self.window_name, init_overlay_text) def get_drawing_overlay_text(): return ( f"{str(self._drawing_mode).split('.')[-1]} drawing mode! " "'Enter/Space' to apply, 'Esc' to quit, 'm' to change mode, " "'z' to remove last drawing, 'r' to remove all drawings, " "hold 'Ctrl' to draw with negative label" ) def draw_labels(image: np.ndarray, with_cursor=False) -> np.ndarray: """Draw points/boxes labels on image :param image: RGB image to drawn onto :return image: drawn image in BGR format """ image = self.preprocess_image(image) point_radius = int(np.sqrt(np.prod(image.shape[:2])) * 0.005) line_thickness = int(np.sqrt(np.prod(image.shape[:2])) * 0.004) # Current mouse position if with_cursor: image = cv2.circle( # type: ignore image, self._mouse_pos, radius=point_radius, color=(0, 0, 255) if self._CTRLKEY else (0, 255, 0), thickness=-1, ) # Drawn points for point, label in zip(self.points, self.point_labels): image = cv2.circle( image, point, radius=point_radius, color=(0, 255, 0) if label else (0, 0, 255), thickness=-1, ) # Drawn boxes for (x0, y0, x1, y1), label in zip(self.boxes, self.box_labels): image = cv2.rectangle( # type: ignore image, (x0, y0), (x1, y1), color=(0, 255, 0) if label else (0, 0, 255), thickness=line_thickness, ) return image def get_image_idx() -> int | None: """Get image index that the mouse is pointing at""" image_idx_mask = np.all( [ (self.image_boundaries[:, :, 0] <= self._mouse_pos).all(1), (self._mouse_pos < self.image_boundaries[:, :, 1]).all(1), ], axis=0, ) if image_idx_mask.any(): return int(np.flatnonzero(image_idx_mask)) else: self.logger.warning("No selection, please move mouse to select") return None if len(self.images) == 1: # when there is only one image, skip selection self.selected_image_idx = 0 self._image = self.images[self.selected_image_idx].copy() cv2.displayOverlay(self.window_name, get_drawing_overlay_text()) while True: if self._in_drawing: cv2.imshow(self.window_name, draw_labels(self._image, with_cursor=True)) # type: ignore cv2.pollKey() continue if (key := cv2.pollKey()) in [13, 32]: # ENTER or Space self._done_drawing = True if ( self.selected_image_idx is not None and self._image is not None and (len(self.points) > 0 or len(self.boxes) > 0) ): self.images.insert( self.selected_image_idx + 1, cv2.cvtColor(draw_labels(self._image), cv2.COLOR_BGR2RGB), ) self.show_images(self.images) break elif key == 27: # ESC, stop current drawing and restart if self.selected_image_idx is None: self._done_drawing = True break else: self.selected_image_idx = None self._image = None self._extra_ret_data = None self._mouse_pos = (-1, -1) self.points = np.empty((0, 2), dtype=int) self.point_labels = np.empty(0, dtype=int) self.boxes = np.empty((0, 4), dtype=int) self.box_labels = np.empty(0, dtype=int) self.show_images(self.images) cv2.displayOverlay(self.window_name, init_overlay_text) elif ( key == ord("x") and (image_idx := get_image_idx()) is not None ): # delete image self.images.pop(image_idx) self.show_images(self.images) elif ( key == ord("s") # select image and self.selected_image_idx is None and (image_idx := get_image_idx()) is not None ): self.selected_image_idx = image_idx self._image = self.images[self.selected_image_idx].copy() self.points = np.empty((0, 2), dtype=int) self.point_labels = np.empty(0, dtype=int) self.boxes = np.empty((0, 4), dtype=int) self.box_labels = np.empty(0, dtype=int) cv2.displayOverlay(self.window_name, get_drawing_overlay_text()) if self._image is None: continue # Draw image cv2.imshow(self.window_name, draw_labels(self._image, with_cursor=True)) if key == ord("m"): # change drawing mode self._drawing_mode = ( self.DrawingMode.Box if self._drawing_mode == self.DrawingMode.Point else self.DrawingMode.Point ) cv2.displayOverlay(self.window_name, get_drawing_overlay_text()) elif key == ord("z"): # remove previous drawing if ( self._drawing_mode == self.DrawingMode.Point and len(self.points) > 0 ): self.points = self.points[:-1] self.point_labels = self.point_labels[:-1] self.call_update_drawing_fn() elif self._drawing_mode == self.DrawingMode.Box and len(self.boxes) > 0: self.boxes = self.boxes[:-1] self.box_labels = self.box_labels[:-1] self.call_update_drawing_fn() elif key == ord("r"): # remove all drawings self.points = np.empty((0, 2), dtype=int) self.point_labels = np.empty(0, dtype=int) self.boxes = np.empty((0, 4), dtype=int) self.box_labels = np.empty(0, dtype=int) self.call_update_drawing_fn() cv2.displayOverlay( self.window_name, "Drawing ended. Press 'd' to go into drawing mode", 5000 ) if self.selected_image_idx is not None: return ( self.images[self.selected_image_idx].copy(), self.drawn_labels, self._extra_ret_data, ) else: return None
[docs] def on_mouse(self, event: int, x: int, y: int, flags: int, param: Any = None): """ Callback function for mouse events. :param event: one of the cv2.MouseEventTypes: [EVENT_MOUSEMOVE, EVENT_LBUTTONDOWN, EVENT_RBUTTONDOWN, EVENT_MBUTTONDOWN, EVENT_LBUTTONUP, EVENT_RBUTTONUP, EVENT_MBUTTONUP, EVENT_LBUTTONDBLCLK, EVENT_RBUTTONDBLCLK, EVENT_MBUTTONDBLCLK, EVENT_MOUSEWHEEL, EVENT_MOUSEHWHEEL] https://docs.opencv.org/4.9.0/d0/d90/group__highgui__window__flags.html#ga927593befdddc7e7013602bca9b079b0 :param x: The x-coordinate of the mouse event. :param y: The y-coordinate of the mouse event. :param flags: one of the cv2.MouseEventFlags: [EVENT_FLAG_LBUTTON, EVENT_FLAG_RBUTTON, EVENT_FLAG_MBUTTON, EVENT_FLAG_CTRLKEY, EVENT_FLAG_SHIFTKEY, EVENT_FLAG_ALTKEY] https://docs.opencv.org/4.9.0/d0/d90/group__highgui__window__flags.html#gaab4dc057947f70058c80626c9f1c25ce :param param: The optional user data passed by cv2.setMouseCallback(). """ if self._done_drawing: return self._mouse_pos = (x, y) self._CTRLKEY = flags & cv2.EVENT_FLAG_CTRLKEY if self._image is None: # No selected image return image_bounds = np.stack([[0, 0], self._image.shape[1::-1]]).T if self._drawing_mode == self.DrawingMode.Box: if event == cv2.EVENT_LBUTTONDOWN: self._in_drawing = True self.boxes = np.vstack([self.boxes, [x, y, x, y]]) self.box_labels = np.hstack([ self.box_labels, 0 if self._CTRLKEY else 1, ]) elif event == cv2.EVENT_LBUTTONUP: # Clip mouse position to be within image bounds self._mouse_pos = (x1, y1) = tuple( # type: ignore np.clip(self._mouse_pos, image_bounds[:, 0], image_bounds[:, 1] - 1) ) self._in_drawing = False # Enforce boxes to be XYXY coordinates x0, y0 = self.boxes[-1, :2] self.boxes[-1] = [min(x0, x1), min(y0, y1), max(x0, x1), max(y0, y1)] self.call_update_drawing_fn() elif event == cv2.EVENT_MOUSEMOVE and self._in_drawing: # for visualization self.boxes[-1, 2:] = self._mouse_pos elif self._drawing_mode == self.DrawingMode.Point: if event == cv2.EVENT_LBUTTONDOWN: self._in_drawing = True elif event == cv2.EVENT_LBUTTONUP: self._in_drawing = False if not ( (image_bounds[:, 0] <= self._mouse_pos).all() and (self._mouse_pos < image_bounds[:, 1]).all() ): self.logger.warning("Drawn points outside image bounds, ignoring") return self.points = np.vstack([self.points, self._mouse_pos]) self.point_labels = np.hstack([ self.point_labels, 0 if self._CTRLKEY else 1, ]) self.call_update_drawing_fn()
@staticmethod def _update_drawing_fn( image: np.ndarray, drawn_labels: dict[str, np.ndarray], / ) -> tuple[np.ndarray, Any]: """Callback function with drawn points/boxes during drawing""" return image, None
[docs] def call_update_drawing_fn(self) -> None: """Call self._update_drawing_fn with current drawings This happens when the user adds/removes any drawing. """ if self.selected_image_idx is None: self.logger.error("No image selected, please select image first") return cv2.setWindowTitle(self.window_name, "Busy updating drawings...") self._image, self._extra_ret_data = self._update_drawing_fn( self.images[self.selected_image_idx].copy(), self.drawn_labels ) cv2.setWindowTitle(self.window_name, self.window_name)
@property def drawn_labels(self) -> dict[str, np.ndarray]: return { "points": self.points, "point_labels": self.point_labels, "boxes": self.boxes, "box_labels": self.box_labels, }
[docs] def close(self): cv2.destroyWindow(self.window_name)
def __del__(self): self.close() def __repr__(self): return f"<{self.__class__.__name__}: {self.window_name}>"