Source code for concepts.hw_interface.robot_state_visualizer.visualizer_multiproc
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : visualizer.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 11/20/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
"""Robot state visualizer written using Dash and ZMQ for inter-process communication."""
import pickle
import threading
import time
import zmq
from concepts.hw_interface.robot_state_visualizer.visualizer import RobotStateVisualizer
[docs]
class RobotStateVisualizerMultiproc(RobotStateVisualizer):
INIT_PORT = 5557
SUB_PORT = 5556
[docs]
def listener_thread(self):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect(f'tcp://localhost:{self.SUB_PORT}')
socket.setsockopt_string(zmq.SUBSCRIBE, '')
while True:
try:
message = socket.recv()
message = pickle.loads(message)
if message['type'] == 'update':
payload = message['data']
with self.mutex:
for key, data in payload.items():
timestamp, value = data
if isinstance(key, tuple):
tab, name = key
self.update_queue_with_mutex(name, timestamp, value, tab=tab)
else:
self.update_queue_with_mutex(key, timestamp, value)
else:
print(f'Unknown message type: {message["type"]}')
except Exception as e:
print(f'Error: {e}')
import traceback
traceback.print_exc()
[docs]
def start(self):
# Create a REP socket to receive initialization message.
context = zmq.Context()
socket = context.socket(zmq.REP)
print(f'Binding to port {self.INIT_PORT}')
socket.bind(f'tcp://*:{self.INIT_PORT}')
print(f'Visualizer server started at port {self.INIT_PORT}.')
message = socket.recv()
message = pickle.loads(message)
self.reset(queues=message['data'])
self.initialized = True
socket.send(b'OK')
socket.close()
print('Visualizer initialized.')
threading.Thread(target=self.listener_thread, daemon=True).start()
super().start()
while True:
time.sleep(1)
[docs]
class RobotStateVisualizerPublisher(object):
PUB_PORT = 5556
[docs]
def __init__(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.bind(f'tcp://*:{self.PUB_PORT}')
self.reset_message_sent = False
[docs]
def publish(self, message):
if not self.reset_message_sent:
raise ValueError('Reset message not sent before update message.')
message = {'type': 'update', 'data': message}
self.socket.send(pickle.dumps(message))
[docs]
def reset(self, queues):
if self.reset_message_sent:
raise ValueError('Reset message already sent.')
message = {'type': 'reset', 'data': queues}
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(f'tcp://localhost:{RobotStateVisualizerMultiproc.INIT_PORT}')
socket.send(pickle.dumps(message))
socket.recv()
socket.close()
self.reset_message_sent = True