#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : device.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 09/12/2022
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
"""
Wrapper around pyrealsense2. Based on:
https://github.com/IntelRealSense/librealsense/issues/8388#issuecomment-782395443
"""
from typing import Dict, List, Optional, Tuple, ClassVar, Callable
import numpy as np
try:
import pyrealsense2 as rs
except ImportError:
import unittest.mock
rs = unittest.mock.Mock()
__all__ = ['RealSenseDevice', 'start_pipelines', 'stop_pipelines']
# Note: change these to fit your use case. Assuming USB 3.2 connection.
_NAME_TO_STREAM_CONFIGURATIONS: Dict[str, List[Tuple]] = {
# Mapping of camera name to a list of streams to enable
# in the cfg.enable_stream format
"Intel RealSense D435": [
(rs.stream.depth, 1280, 720, rs.format.z16, 30),
(rs.stream.color, 1920, 1080, rs.format.bgr8, 30),
],
"Intel RealSense L515": [
(rs.stream.depth, 1024, 768, rs.format.z16, 30),
(rs.stream.color, 1920, 1080, rs.format.bgr8, 30),
],
}
[docs]
class RealSenseInterface(object):
"""Since we have multiple RealSense implementations, we define an interface here."""
[docs]
def get_serial_number(self) -> str:
raise NotImplementedError()
[docs]
def get_rgbd_image(self, format: str = 'rgb') -> Tuple[np.ndarray, np.ndarray]:
raise NotImplementedError()
[docs]
def get_color_intrinsics(self) -> Tuple[np.ndarray]:
raise NotImplementedError()
[docs]
def get_depth_intrinsics(self) -> Tuple[np.ndarray]:
raise NotImplementedError()
[docs]
class RealSenseDevice(RealSenseInterface):
ctx: ClassVar[rs.context] = rs.context()
[docs]
def __init__(self, name: str, serial_number: str, align_to: str = 'color'):
self.name = name
self.serial_number = serial_number
if align_to == 'color':
self.align = rs.align(rs.stream.color)
elif align_to == 'depth':
self.align = rs.align(rs.stream.depth)
else:
raise ValueError(f"Invalid align_to value: {align_to}")
self.pipeline: Optional[rs.pipeline] = None
self.registered_points: List[Tuple[float, float]] = list()
self.color_intrinsics = None
self.depth_intrinsics = None
[docs]
def get_serial_number(self) -> str:
return self.serial_number
[docs]
def get_rgbd_image(self, format: str = 'rgb') -> Tuple[np.ndarray, np.ndarray]:
rgb, depth = self.capture_images()
if format == 'bgr':
return rgb, depth
elif format == 'rgb':
return rgb[..., ::-1], depth
else:
raise ValueError(f'Invalid format: {format}.')
[docs]
def get_color_intrinsics(self) -> np.ndarray:
return self.color_intrinsics
[docs]
def get_depth_intrinsics(self) -> np.ndarray:
return self.depth_intrinsics
[docs]
@classmethod
def from_rs_device(cls, dev: rs.device) -> 'RealSenseDevice':
name = dev.get_info(rs.camera_info.name)
serial_number = dev.get_info(rs.camera_info.serial_number)
return cls(name, serial_number)
[docs]
@classmethod
def from_serial_number(cls, serial_number: str) -> 'RealSenseDevice':
devices = cls.find_devices()
for device in devices:
if device.serial_number == serial_number:
return device
raise ValueError(f"Device with serial number {serial_number} not found!")
[docs]
@classmethod
def find_devices(cls, device_filter: str = "") -> List['RealSenseDevice']:
"""
Get devices as detected by RealSense and filter devices that only
contain the provided device_filter string in their name.
e.g. to filter for D435 only you can call `find_devices("D435")`
"""
devices = [cls.from_rs_device(dev) for dev in cls.ctx.devices]
# Filter devices
if device_filter:
device_filter = device_filter.lower()
devices = [d for d in devices if device_filter in d.name.lower()]
print(f"Found devices (filter={device_filter}): {devices}")
else:
print(f"Found devices: {devices}")
if not devices:
raise RuntimeError("No devices connected!")
return devices
@property
def stream_configurations(self) -> List[Tuple]:
configs = _NAME_TO_STREAM_CONFIGURATIONS
if self.name not in configs:
raise RuntimeError(f"Configuration not specified for {self.name}")
return configs[self.name]
[docs]
def start_pipeline(self) -> None:
"""Start RealSense pipeline"""
if self.pipeline is not None:
print(f"Pipeline already started for {self}")
return
# Setup pipeline and configuration
pipeline = rs.pipeline(self.ctx)
cfg = rs.config()
cfg.enable_device(self.serial_number)
for stream_configuration in self.stream_configurations:
cfg.enable_stream(*stream_configuration)
try:
prof = pipeline.start(cfg)
depth_stream = prof.get_stream(rs.stream.depth)
color_stream = prof.get_stream(rs.stream.color)
depth_intr = depth_stream.as_video_stream_profile().get_intrinsics()
color_intr = color_stream.as_video_stream_profile().get_intrinsics()
self.depth_intrinsics = get_intrinsics_matrix(depth_intr)
self.color_intrinsics = get_intrinsics_matrix(color_intr)
except RuntimeError as e:
message = str(e)
if message == "Couldn't resolve requests":
# Something wrong with stream configurations probably
raise RuntimeError(
f"{message} for {self}. Check stream configuration and USB connection."
)
else:
raise e
# Set the pipeline on the device
self.pipeline = pipeline
print(f"Started pipeline for {self}")
[docs]
def stop_pipeline(self) -> None:
"""Stop RealSense pipeline"""
if not self.pipeline:
print(f"Warning! Device {self} does not have a pipeline initialized")
else:
self.pipeline.stop()
print(f"Stopped pipeline for {self}")
[docs]
def capture_images(self) -> Tuple[np.ndarray, np.ndarray]:
"""Capture color and depth images"""
if self.pipeline is None:
raise RuntimeError(f"Pipeline for {self} not started!")
# Get frames and align
frames = self.pipeline.wait_for_frames()
aligned_frames = self.align.process(frames)
# Get color and depth frame
depth_frame = aligned_frames.get_depth_frame()
color_frame = aligned_frames.get_color_frame()
if not depth_frame or not color_frame:
raise RuntimeError("Could not capture both color and depth frame.")
# Convert to numpy arrays
depth_image = np.asanyarray(depth_frame.get_data())
color_image = np.asanyarray(color_frame.get_data())
return color_image, depth_image
[docs]
def capture_color_image(self) -> np.ndarray:
return self.capture_images()[0]
[docs]
def capture_depth_image(self) -> np.ndarray:
return self.capture_images()[1]
def __str__(self) -> str:
return f"{self.name} ({self.serial_number})"
def __repr__(self) -> str:
return str(self)
[docs]
def start_pipelines(devices: List[RealSenseDevice]) -> None:
"""Enable each device by starting a stream"""
for device in devices:
device.start_pipeline()
[docs]
def stop_pipelines(devices: List[RealSenseDevice]) -> None:
"""Stop all the pipelines"""
for device in devices:
device.stop_pipeline()
[docs]
def get_intrinsics_matrix(intr: rs.intrinsics) -> np.ndarray:
fx = float(intr.fx)
fy = float(intr.fy)
ppx = float(intr.ppx)
ppy = float(intr.ppy)
axs = 0.0
return np.array([
[fx, axs, ppx],
[0.0, fy, ppy],
[0.0, 0.0, 1.0]
])
[docs]
def get_depth_visualization(depth_image: np.ndarray) -> np.ndarray:
"""Get depth visualization."""
import cv2
depth_colormap = cv2.applyColorMap(
cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET
)
return depth_colormap
[docs]
def get_concat_rgbd_visualization(color_image: np.ndarray, depth_image: np.ndarray) -> np.ndarray:
"""Get concatenated RGB-D visualization in a row."""
depth_colormap = get_depth_visualization(depth_image)
images = np.hstack((color_image, depth_colormap))
return images