Source code for concepts.hw_interface.realsense.visualizer

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : visualizer.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 09/12/2022
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

import os
import numpy as np
import cv2
import time

from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING, List
from concepts.hw_interface.realsense.device import RealSenseInterface, get_concat_rgbd_visualization

__all__ = ['WindowEvent', 'RealSenseVisualizer', 'visualize_devices', 'run_4corner_calibration']


[docs] class WindowEvent(Enum): EXIT = "exit" SAVE = "save" NONE = "none"
[docs] class RealSenseVisualizer(object):
[docs] def __init__(self, device): self.device: RealSenseInterface = device
[docs] def run(self, save_dir: str = "", save_image: bool = False) -> WindowEvent: """Visualize color and depth images in a cv2 window. - Terminates when 'esc' or 'q' key is pressed. - Saves an image when the 's' key is pressed or if the 'save_image' flag is specified. - Images are saved to the specified save_dir, which we assume to already exist. Returns: WindowEvent indicating status of cv2. """ color_image, depth_image = self.device.get_rgbd_image(format='bgr') # Form heatmap for depth and stack with color image depth_colormap = cv2.applyColorMap( cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET ) images = np.hstack((color_image, depth_colormap)) window_name = str(self) cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) cv2.imshow(window_name, images) key = cv2.waitKey(1) # Exit on 'esc' or 'q' if key & 0xFF == ord("q") or key == 27: cv2.destroyWindow(window_name) return WindowEvent.EXIT # Save images if 's' key is pressed if key == 115 or save_image: timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") depth_fname = os.path.join( save_dir, f"{timestamp}-{self.device.get_serial_number()}-depth.png" ) color_fname = os.path.join( save_dir, f"{timestamp}-{self.device.get_serial_number()}-color.png" ) cv2.imwrite(depth_fname, depth_image) cv2.imwrite(color_fname, color_image) print(f"Saved depth image for {self} to {depth_fname}") print(f"Saved color image to {self} to {color_fname}") return WindowEvent.SAVE return WindowEvent.NONE
[docs] class RealSenseStreamVisualizer(object):
[docs] def __init__(self, capture: RealSenseInterface, fps: int = 6): self.window_name = 'RealSense Capture' cv2.namedWindow(self.window_name, cv2.WINDOW_AUTOSIZE) self.capture = capture self.fps = fps self.last_frame_time = 0 self.recording = False self.recorded = list()
[docs] def show(self, img): img = img.copy() if self.recording: # Draw a red rectangle around the image cv2.rectangle(img, (0, 0), (img.shape[1] - 1, img.shape[0] - 1), (0, 0, 255), 2) cv2.imshow(self.window_name, img) if self.last_frame_time != 0: time_diff = cv2.getTickCount() - self.last_frame_time delay = max(1, (1000 // self.fps) - int(time_diff / cv2.getTickFrequency() * 1000)) else: delay = 1000 // self.fps self.last_frame_time = cv2.getTickCount() key = cv2.waitKey(delay) return key
[docs] def close(self): cv2.destroyWindow(self.window_name)
[docs] def print_help(self): print() print('-' * 80) print('RealSense Capture Visualizer') print('Press ESC or q to exit') print('Press r to toggle recording') print('-' * 80) print()
[docs] def run(self): self.print_help() frame_count = 0 while True: frame_count += 1 rgb, depth = self.capture.get_rgbd_image(format='rgb') if self.recording: self.recorded.append((rgb, depth)) if self.recording: current_time = time.strftime('[REC] Frame {:04d} %H:%M:%S'.format(frame_count), time.localtime()) else: current_time = time.strftime('Frame {:04d} %H:%M:%S'.format(frame_count), time.localtime()) frame = get_concat_rgbd_visualization(rgb[..., ::-1], depth) # To BGR cv2.putText(frame, current_time, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA) frame = cv2.resize(frame, (960, int(960 * frame.shape[0] / frame.shape[1]))) key = self.show(frame) if key == 27 or key == ord('q'): break if key == ord('r'): self.recording = not self.recording self.close()
[docs] def visualize_devices(devices: List[RealSenseInterface], save_dir: str = "") -> None: """Visualizes all the devices in a cv2 window. Press 'q' or 'esc' on any of the windows to exit the infinite loop. Press the 's' key in a specific window to save the color and depth image to disk. You can use a similar loop interface in other places where you need a live camera feed (e.g. collecting demonstrations). """ while True: should_exit = False save = False for device in devices: window_event = RealSenseVisualizer(device).run(save_dir, save_image=save) if window_event == WindowEvent.SAVE: # We use this to propagate a save command across all windows save = True # Exit all windows if window_event == WindowEvent.EXIT: should_exit = True break if should_exit: print("Exit key pressed.") cv2.destroyAllWindows() break
[docs] def run_4corner_calibration(devices: List[RealSenseInterface], save_dir: str = "") -> None: while True: should_exit = False for device in devices: window_event = RealSenseVisualizer(device).run(save_dir, save_image=False) # Exit all windows if window_event == WindowEvent.EXIT: should_exit = True break if should_exit: print("Exit key pressed.") cv2.destroyAllWindows() break
def _main(): import jacinle from concepts.hw_interface.realsense.device import start_pipelines, stop_pipelines parser = jacinle.JacArgumentParser() parser.add_argument('--device', choices=['l515', 'd435'], default='d435') args = parser.parse_args() # Detect devices, start pipelines, visualize, and stop pipelines from concepts.hw_interface.realsense.device import RealSenseDevice devices = RealSenseDevice.find_devices(args.device) start_pipelines(devices) try: visualize_devices(devices) finally: stop_pipelines(devices) if __name__ == "__main__": _main()