#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : interpolation_utils.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 05/26/2023
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
from typing import Optional, Union, Sequence, Tuple
import numpy as np
from scipy.interpolate import CubicSpline as _CubicSpline
from scipy.optimize import minimize
from scipy.interpolate import interp1d
from concepts.math.rotationlib_wxyz import slerp as slerp_wxyz, quat_diff as quat_diff_wxyz
from concepts.math.rotationlib_xyzw import slerp as slerp_xyzw, quat_diff as quat_diff_xyzw
__all__ = [
'SplineInterface', 'CubicSpline', 'LinearSpline', 'SlerpSpline', 'PoseSpline',
'gen_cubic_spline', 'gen_linear_spline', 'project_to_cubic_spline', 'get_next_target_cubic_spline', 'project_to_linear_spline', 'get_next_target_linear_spline'
]
[docs]
class SplineInterface(object):
[docs]
def get_min_x(self) -> float:
raise NotImplementedError()
[docs]
def get_max_x(self) -> float:
raise NotImplementedError()
[docs]
def __call__(self, x: float) -> Union[np.ndarray, Tuple[np.ndarray, ...]]:
raise NotImplementedError()
[docs]
def derivative(self, n: int = 1, x: Optional[float] = None) -> Union['SplineInterface', Tuple['SplineInterface', ...]]:
raise NotImplementedError('Derivative is not implemented.')
[docs]
def project_to(self, y: Union[np.ndarray, Tuple[np.ndarray, ...]], minimum_x: Optional[float] = None) -> float:
raise NotImplementedError('Project is not implemented.')
[docs]
def get_next(self, y: Union[np.ndarray, Tuple[np.ndarray, ...]], step_size: float, minimum_x: Optional[float] = None) -> Tuple[float, Union[np.ndarray, Tuple[np.ndarray, ...]]]:
"""Get the next target point on a spline interpolation.
Args:
y: the current point.
step_size: the step size.
minimum_x: the minimum x value to be considered.
Returns:
the next x-value and the next target point.
"""
x = self.project_to(y, minimum_x=minimum_x)
x = min(max(x + step_size, self.get_min_x()), self.get_max_x())
return x, self(x)
[docs]
class CubicSpline(_CubicSpline, SplineInterface):
[docs]
def __init__(self, xs: np.ndarray, ys: np.ndarray):
self.xs = np.asarray(xs, dtype=np.float32)
self.ys = np.asarray(ys, dtype=np.float32)
super(CubicSpline, self).__init__(self.xs, self.ys, axis=0)
[docs]
@classmethod
def from_points(cls, ys: np.ndarray) -> 'CubicSpline':
xs = np.arange(len(ys))
return cls(xs, ys)
[docs]
def get_min_x(self) -> float:
return float(self.xs[0])
[docs]
def get_max_x(self) -> float:
return float(self.xs[-1])
[docs]
def project_to(self, y: np.ndarray, minimum_x: Optional[float] = None) -> float:
"""Project a point to a cubic spline interpolation.
Args:
y: the point to be projected.
minimum_x: the minimum x value to be considered.
Returns:
the time of the projected point.
"""
y = np.asarray(y, dtype=np.float32)
dslp = self.derivative()
def f(x):
return ((self(float(x)) - y) ** 2).sum()
def df(x):
return 2 * ((self(float(x)) - y) * dslp(float(x))).sum()
x0 = np.argmin(np.abs(self.ys - y).sum(axis=-1))
min_x = max(self.get_min_x(), x0 - 1)
if minimum_x is not None:
min_x = max(min_x, minimum_x)
res = minimize(f, x0, jac=df, method='L-BFGS-B', bounds=[min_x, max(min_x + 0.1, min(self.get_max_x(), x0 + 1))])
return float(res.x[0])
[docs]
class LinearSpline(SplineInterface):
"""Linear spline interpolation.
This class is a wrapper of scipy.interpolate.interp1d that mimics the CubicSpline interface.
"""
[docs]
def __init__(self, xs: np.ndarray, ys: np.ndarray):
assert len(xs) == len(ys)
self.xs = np.asarray(xs, dtype=np.float32)
self.ys = np.asarray(ys, dtype=np.float32)
assert len(self.xs.shape) == 1, 'xs should be a 1D array.'
self.interpolator = interp1d(xs, ys, axis=0, kind='linear', fill_value='extrapolate')
[docs]
@classmethod
def from_points(cls, ys: np.ndarray) -> 'LinearSpline':
xs = np.arange(len(ys))
return cls(xs, ys)
[docs]
def get_min_x(self) -> float:
return float(self.xs[0])
[docs]
def get_max_x(self) -> float:
return float(self.xs[-1])
[docs]
def __call__(self, x: float) -> np.ndarray:
return self.interpolator(x)
[docs]
def project_to(self, y: np.ndarray, minimum_x: Optional[float] = None) -> float:
"""Project a point to a linear spline interpolation.
Args:
y: the point to be projected.
minimum_x: the minimum x value to be considered.
Returns:
the time of the projected point.
"""
y = np.asarray(y, dtype=np.float32)
def f(x):
y_prime = self(x)
return ((y - y_prime) ** 2).sum()
x0 = np.argmin(np.abs(self.ys - y).sum(axis=-1))
min_x = max(self.get_min_x(), x0 - 1)
if minimum_x is not None:
min_x = max(min_x, minimum_x)
res = minimize(f, x0, bounds=[(min_x, max(min_x + 0.1, min(self.get_max_x(), x0 + 1)))])
return float(res.x[0])
[docs]
class SlerpSpline(SplineInterface):
"""Slerp spline interpolation.
This class is a wrapper of scipy.interpolate.interp1d that mimics the CubicSpline interface.
"""
[docs]
def __init__(self, xs: np.ndarray, ys: np.ndarray, quat_format: str = 'xyzw'):
assert len(xs) == len(ys)
self.xs = np.asarray(xs, dtype=np.float32)
self.ys = np.asarray(ys, dtype=np.float32)
self.quat_format = quat_format
assert len(self.xs.shape) == 1, 'xs should be a 1D array.'
if self.quat_format == 'xyzw':
self.slerp = slerp_xyzw
self.diff = quat_diff_xyzw
elif self.quat_format == 'wxyz':
self.slerp = slerp_wxyz
self.diff = quat_diff_wxyz
else:
raise ValueError('Unknown format: {}'.format(self.quat_format))
fake_ys = np.arange(len(ys))
self.interpolator = interp1d(xs, fake_ys, axis=0, kind='linear', fill_value='extrapolate')
[docs]
@classmethod
def from_points(cls, ys: np.ndarray, quat_format: str = 'xyzw') -> 'SlerpSpline':
xs = np.arange(len(ys))
return cls(xs, ys, quat_format=quat_format)
[docs]
def get_min_x(self) -> float:
return float(self.xs[0])
[docs]
def get_max_x(self) -> float:
return float(self.xs[-1])
[docs]
def __call__(self, x: float) -> np.ndarray:
y = self.interpolator(x)
y0 = int(np.floor(y))
y1 = int(np.ceil(y))
if y0 == y1:
return self.ys[y0]
if y1 <= 0:
y0, y1 = 0, 1
if y0 >= len(self.ys) - 1:
y0, y1 = len(self.ys) - 2, len(self.ys) - 1
t = (y - y0)
return self.slerp(self.ys[y0], self.ys[y1], t)
[docs]
def project_to(self, y: np.ndarray, minimum_x: Optional[float] = None) -> float:
"""Project a point to a slerp spline interpolation.
Args:
y: the point to be projected.
minimum_x: the minimum x value to be considered.
Returns:
the time of the projected point.
"""
y = np.asarray(y, dtype=np.float32)
def f(x):
y_prime = self(x)
diff = self.diff(y, y_prime)
return abs(diff)
x0 = np.argmin(np.abs(self.ys - y).sum(axis=-1))
min_x = max(self.get_min_x(), x0 - 1)
if minimum_x is not None:
min_x = max(min_x, minimum_x)
res = minimize(f, x0, bounds=[(min_x, max(min_x + 0.1, min(self.get_max_x(), x0 + 1)))])
return float(res.x[0])
[docs]
class PoseSpline(SplineInterface):
"""Pose spline interpolation.
This class is a wrapper of scipy.interpolate.interp1d that mimics the CubicSpline interface.
"""
[docs]
def __init__(self, xs: np.ndarray, y_pos: np.ndarray, y_quat: np.ndarray, quat_format: str = 'xyzw', fixed_quat: Optional[bool] = None):
"""Initialize a pose spline interpolation.
Args:
xs: the time values.
y_pos: the position values.
y_quat: the quaternion values.
quat_format: the quaternion format ('xyzw' or 'wxyz').
fixed_quat: whether the quaternion values are the same for all time steps.
"""
assert len(xs) == len(y_pos) == len(y_quat)
self.xs = np.asarray(xs, dtype=np.float32)
self.y_pos = np.asarray(y_pos, dtype=np.float32)
self.y_quat = np.asarray(y_quat, dtype=np.float32)
self.pos_spline = LinearSpline(self.xs, self.y_pos)
self.quat_spline = SlerpSpline(self.xs, self.y_quat, quat_format=quat_format)
self.mixed_spline = LinearSpline(self.xs, np.concatenate([self.y_pos, self.y_quat], axis=-1))
self.fixed_quat = fixed_quat
if self.fixed_quat is None:
self.fixed_quat = np.allclose(self.y_quat, self.y_quat[0:1])
[docs]
@classmethod
def from_points(cls, y_pos: np.ndarray, y_quat: np.ndarray, quat_format: str = 'xyzw') -> 'PoseSpline':
xs = np.arange(len(y_pos))
return cls(xs, y_pos, y_quat, quat_format=quat_format)
[docs]
@classmethod
def from_pose_sequence(cls, pose_sequence: Sequence[Tuple[np.ndarray, np.ndarray]], quat_format: str = 'xyzw') -> 'PoseSpline':
y_pos = np.asarray([pose[0] for pose in pose_sequence])
y_quat = np.asarray([pose[1] for pose in pose_sequence])
return cls.from_points(y_pos, y_quat, quat_format=quat_format)
[docs]
def get_min_x(self) -> float:
return float(self.xs[0])
[docs]
def get_max_x(self) -> float:
return float(self.xs[-1])
[docs]
def __call__(self, x: float) -> Tuple[np.ndarray, np.ndarray]:
if self.fixed_quat:
return self.pos_spline(x), self.y_quat[0]
return self.pos_spline(x), self.quat_spline(x)
[docs]
def project_to(self, y: Tuple[np.ndarray, np.ndarray], minimum_x: Optional[float] = None) -> float:
"""Project a point to a pose spline interpolation.
Args:
y: the point to be projected.
minimum_x: the minimum x value to be considered.
Returns:
the time of the projected point.
"""
if self.fixed_quat:
pos = y[0]
x = self.pos_spline.project_to(pos, minimum_x=minimum_x)
else:
pos, quat = y
x = self.mixed_spline.project_to(np.concatenate([pos, quat], axis=-1), minimum_x=minimum_x)
return x
"""Below are the obsolete functions that do not use the object-oriented interface."""
[docs]
def gen_cubic_spline(ys: np.ndarray) -> CubicSpline:
"""Generate a cubic spline interpolation from an array of points."""
return CubicSpline.from_points(ys)
[docs]
def gen_linear_spline(ys) -> LinearSpline:
"""Generate a linear spline interpolation from an array of points."""
return LinearSpline.from_points(ys)
[docs]
def project_to_cubic_spline(spl: CubicSpline, y: np.ndarray, ys: np.ndarray) -> float:
"""Project a point to a cubic spline interpolation.
Args:
spl: the cubic spline interpolation.
y: the point to be projected.
ys: the array of points that the cubic spline interpolation is generated from.
Returns:
the time of the projected point.
"""
return spl.project_to(y, minimum_x=0)
[docs]
def get_next_target_cubic_spline(spl: CubicSpline, y: np.ndarray, step_size: float, ys: np.ndarray) -> Tuple[float, np.ndarray]:
"""Get the next target point on a cubic spline interpolation.
Args:
spl: the cubic spline interpolation.
y: the current point.
step_size: the step size.
ys: the array of points that the cubic spline interpolation is generated from.
Returns:
the next target point.
"""
return spl.get_next(y, step_size, minimum_x=0)
[docs]
def project_to_linear_spline(spl: LinearSpline, y: np.ndarray, minimum_x: Optional[float] = None) -> float:
"""Project a point to a linear spline interpolation.
Args:
spl: the linear spline interpolation.
y: the point to be projected.
minimum_x: the minimum x value to be considered.
Returns:
the time of the projected point.
"""
return spl.project_to(y, minimum_x=minimum_x)
[docs]
def get_next_target_linear_spline(spl: LinearSpline, y: np.ndarray, step_size: float, minimum_x: Optional[float] = None) -> Tuple[float, np.ndarray]:
"""Get the next target point on a linear spline interpolation.
Args:
spl: the linear spline interpolation.
y: the current point.
step_size: the step size.
minimum_x: the minimum x value to be considered.
Returns:
the next target point (time, point).
"""
return spl.get_next(y, step_size, minimum_x=minimum_x)