Source code for concepts.hw_interface.realsense.device_f

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : device_f.py
# Author : Xiaolin Fang
# Email  : xiaolinf@mit.edu
# Date   : 09/11/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

"""RealSense interface based on Xiaolin Fang's code"""

import numpy as np
from typing import Tuple

try:
    import pyrealsense2 as rs
    rs_imported = True
except ImportError:
    import unittest.mock
    rs = unittest.mock.Mock()
    rs_imported = False

from concepts.hw_interface.realsense.device import RealSenseInterface


[docs] def rs_intrinsics_to_opencv_intrinsics(intr): D = np.array(intr.coeffs) K = np.array([[intr.fx, 0, intr.ppx], [0, intr.fy, intr.ppy], [0, 0, 1]]) return K, D
[docs] def get_serial_number(pipeline_profile): return pipeline_profile.get_device().get_info(rs.camera_info.serial_number)
[docs] def get_intrinsics(pipeline_profile, stream=None): if stream is None: stream = rs.stream.color stream_profile = pipeline_profile.get_stream(stream) # Fetch stream profile for depth stream intr = stream_profile.as_video_stream_profile().get_intrinsics() # Downcast to video_stream_profile and fetch intrinsics return rs_intrinsics_to_opencv_intrinsics(intr)
[docs] class CaptureRS(RealSenseInterface):
[docs] def __init__(self, callback=None, vis=False, serial_number=None, intrinsics=None, min_tags=1, auto_close=False): if not rs_imported: raise ImportError('pyrealsense2 is not installed. Please install it using `pip install pyrealsense2`') self.callback = callback self.vis = vis self.min_tags = min_tags self.init_serial_number = serial_number # Configure depth and color streams # 1280 x 720 is the highest realsense dep can get self.pipeline = rs.pipeline() config = rs.config() if serial_number is not None: config.enable_device(serial_number) config.enable_stream(rs.stream.depth, 1280, 720, rs.format.z16, 6) config.enable_stream(rs.stream.color, 1280, 720, rs.format.rgb8, 15) # Start streaming pipeline_profile = self.pipeline.start(config) # And get the device info self.serial_number = get_serial_number(pipeline_profile) print(f'Connected to {self.init_serial_number}') # get the camera intrinsics if intrinsics is None: self.intrinsics = get_intrinsics(pipeline_profile) else: self.intrinsics = intrinsics if auto_close: def close_rs(): print(f'Closing RealSense camera: {self.init_serial_number}') self.close() import atexit atexit.register(close_rs)
[docs] def get_serial_number(self) -> str: return self.serial_number
[docs] def get_rgbd_image(self, format: str = 'rgb') -> Tuple[np.ndarray, np.ndarray]: rgb, depth = self.capture() if format == 'bgr': return rgb[..., ::-1], depth elif format == 'rgb': return rgb, depth else: raise ValueError(f'Invalid format: {format}.')
[docs] def get_color_intrinsics(self) -> np.ndarray: return self.intrinsics
[docs] def get_depth_intrinsics(self) -> np.ndarray: return self.intrinsics
[docs] def capture(self, dep_only=False, to_bgr=False): # for _ in range(10): # # Wait for a coherent pair of frames: depth and color while True: frameset = self.pipeline.wait_for_frames(timeout_ms=5000) align = rs.align(rs.stream.color) frameset = align.process(frameset) # Update color and depth frames: aligned_depth_frame = frameset.get_depth_frame() depth_image = np.asanyarray(aligned_depth_frame.get_data()).copy() color_image = np.asanyarray(frameset.get_color_frame().get_data()).copy() if dep_only: # depth_frame = frameset.get_depth_frame() # if not depth_frame: continue # depth_image = np.asanyarray(depth_frame.get_data()) return depth_image if to_bgr: color_image = color_image[:, :, ::-1] return color_image, depth_image
[docs] def skip_frames(self, n): for i in range(n): _ = self.pipeline.wait_for_frames()
[docs] def close(self): self.pipeline.stop()