#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : configuration_space.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 12/01/2019
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
import numpy as np
from math import ceil
__all__ = [
'ConfigurationSpace',
'ProxyConfigurationSpace',
'BoxConfigurationSpace',
'ConstrainedConfigurationSpace',
'BoxConstrainedConfigurationSpace',
'ProblemSpace',
'CollisionFreeProblemSpace'
]
[docs]
class ConfigurationSpace(object):
[docs]
def sample(self):
raise NotImplementedError()
[docs]
def difference(self, config1, config2):
raise NotImplementedError()
[docs]
def distance(self, config1, config2):
raise NotImplementedError()
[docs]
def validate_config(self, config):
raise NotImplementedError()
[docs]
def gen_path(self, config1, config2):
raise NotImplementedError()
[docs]
class ProxyConfigurationSpace(ConfigurationSpace):
[docs]
def __init__(self, proxy):
self.proxy = proxy
def __getattr__(self, item):
return getattr(self.proxy, item)
[docs]
def sample(self):
return self.proxy.sample()
[docs]
def difference(self, config1, config2):
return self.proxy.difference(config1, config2)
[docs]
def distance(self, config1, config2):
return self.proxy.distance(config1, config2)
[docs]
def validate_config(self, config):
return self.proxy.validate_config(config)
[docs]
def gen_path(self, config1, config2):
raise NotImplementedError()
[docs]
class BoxConfigurationSpace(ConfigurationSpace):
[docs]
def __init__(self, cspace_ranges, cspace_max_stepdiff):
super().__init__()
self.cspace_ranges = cspace_ranges
if isinstance(cspace_max_stepdiff, (int, float)):
cspace_max_stepdiff = (cspace_max_stepdiff,) * len(self.cspace_ranges)
self.cspace_max_stepdiff = np.array(cspace_max_stepdiff, dtype='float32')
[docs]
def sample(self):
return tuple(r.sample() for r in self.cspace_ranges)
[docs]
def difference(self, one: tuple, two: tuple):
a = [self.cspace_ranges[i].difference(one[i], two[i]) for i in range(len(self.cspace_ranges))]
return a
[docs]
def distance(self, one: tuple, two: tuple):
a = [abs(self.cspace_ranges[i].difference(one[i], two[i])) for i in range(len(self.cspace_ranges))]
return sum(a)
[docs]
def validate_config(self, config):
return len(config) == len(self.cspace_ranges) and all(
[self.cspace_ranges[i].contains(config[i]) for i in range(len(self.cspace_ranges))]
)
[docs]
def gen_path_l1(self, config1: tuple, config2: tuple):
# TODO(Jiayuan Mao @ 2023/03/09): bring this back. In panda planning, sometimes the input configurations are invalid...
# assert self.validate_config(config1) and self.validate_config(config2), 'Invalid config pair: {} and {}.'.format(config1, config2)
diffs = [self.cspace_ranges[i].difference(config1[i], config2[i]) for i in range(len(self.cspace_ranges))]
samples = max([int(ceil(abs(diff) / max_diff)) + 1 for diff, max_diff in zip(diffs, self.cspace_max_stepdiff)])
samples = max(2, samples)
linear_interpolation = [diff / (samples - 1.0) for diff in diffs]
path = [config1]
for s in range(1, samples - 1):
sample = tuple(
[
self.cspace_ranges[i].make_in_range(config1[i] + s * linear_interpolation[i])
for i in range(len(self.cspace_ranges))
]
)
path.append(sample)
return True, path + [config2]
[docs]
def gen_path_l2(self, config1: tuple, config2: tuple):
diffs = [self.cspace_ranges[i].difference(config1[i], config2[i]) for i in range(len(self.cspace_ranges))]
diffs_norm = np.linalg.norm(diffs, ord=2)
nr_steps = int(ceil(diffs_norm / self.cspace_max_stepdiff[0])) + 1
nr_steps = max(2, nr_steps)
linear_interpolation = [diff / (nr_steps - 1.0) for diff in diffs]
path = [config1]
for s in range(1, nr_steps - 1):
sample = tuple(
[
self.cspace_ranges[i].make_in_range(config1[i] + s * linear_interpolation[i])
for i in range(len(self.cspace_ranges))
]
)
path.append(sample)
# max_step_size = 0
# for c1, c2 in zip(path, path[1:]):
# max_step_size = max(max_step_size, self.distance(c1, c2))
# print(f'gen_path_l2: max_step_size = {max_step_size}; nr_configs = {len(path)}')
return True, path + [config2]
[docs]
def gen_path(self, config1: tuple, config2: tuple):
return self.gen_path_l1(config1, config2)
[docs]
def gen_interpolated_path(self, config1: tuple, config2: tuple, nr_steps: int):
# use linspace to interpolate between config1 and config2
return np.array([np.linspace(config1[i], config2[i], nr_steps) for i in range(len(config1))]).T
[docs]
class ConstrainedConfigurationSpace(ProxyConfigurationSpace):
[docs]
def __init__(self, cspace, constrain_config_func=None, extra_validate_config_func=None):
super().__init__(cspace)
self.constrain_config_func = constrain_config_func
self.extra_validate_config_func = extra_validate_config_func
[docs]
def sample(self):
return self.constrain_config(self.proxy.sample())
[docs]
def validate_config(self, config):
if self.extra_validate_config_func is not None:
return self.proxy.validate_config(config) and self.extra_validate_config_func(self, config)
else:
return self.proxy.validate_config(config)
[docs]
def constrain_config(self, config):
if self.constrain_config_func is not None:
return self.constrain_config_func(self, config)
raise NotImplementedError()
[docs]
def gen_path(self, config1: tuple, config2: tuple):
success, path = self.proxy.gen_path(config1, config2)
for i, config in enumerate(path):
path[i] = self.constrain_config(config)
return success, path
[docs]
class BoxConstrainedConfigurationSpace(ConstrainedConfigurationSpace):
[docs]
def __init__(
self,
cspace,
constrain_config_func=None,
extra_validate_config_func=None,
cspace_max_stepdiff_proj=3,
gen_path_atol=1e-5,
):
assert isinstance(cspace, BoxConfigurationSpace)
super().__init__(cspace, constrain_config_func, extra_validate_config_func)
self._cspace_max_stepdiff_proj = cspace_max_stepdiff_proj
self._gen_path_atol = gen_path_atol
@property
def cspace_ranges(self):
return self.proxy.cspace_ranges
@property
def cspace_max_stepdiff(self):
return self.proxy.cspace_max_stepdiff
@property
def cspace_max_stepdiff_proj(self):
if isinstance(self._cspace_max_stepdiff_proj, (int, float)):
return tuple(x * self._cspace_max_stepdiff_proj for x in self.cspace_max_stepdiff)
return self._cspace_max_stepdiff_proj
[docs]
def gen_path(self, config1, config2):
assert self.validate_config(config1) and self.validate_config(config2)
config = config1
path = [config]
success = False
for i in range(1000):
config = config + np.fmin(self.cspace_max_stepdiff, np.array(config2) - np.array(config))
config = self.constrain_config(config)
diffs = [self.cspace_ranges[i].difference(path[-1][i], config[i]) for i in range(len(self.cspace_ranges))]
valid = all(abs(diff) < constrain for diff, constrain in zip(diffs, self.cspace_max_stepdiff_proj))
if valid:
path.append(config)
if np.allclose(config, config2):
success = True
break
if self.distance(config, config2) >= self.distance(path[-1], config2) - self._gen_path_atol:
break
else:
break
return success, path
[docs]
class ProblemSpace(object):
[docs]
def __init__(self, cspace):
self.cspace = cspace
[docs]
def sample(self, nr_trials=None):
if nr_trials is None:
nr_trials = self.sample.default_nr_trials
for i in range(nr_trials):
sample = self.cspace.sample()
if self.validate_config(sample):
return sample
raise ValueError('Unable to produce a sample.')
sample.default_nr_trials = 100
[docs]
def difference(self, config1, config2):
return self.cspace.difference(config1, config2)
[docs]
def distance(self, config1, config2):
return self.cspace.distance(config1, config2)
[docs]
def validate_config(self, config):
return self.cspace.validate_config(config)
[docs]
def distance_path(self, path):
distance = 0
for i in range(len(path) - 1):
distance += self.cspace.distance(path[i], path[i + 1])
return distance
[docs]
def validate_path(self, config1, config2):
success, _, _ = self.try_extend_path(config1, config2)
return success
[docs]
def try_extend_path(self, start, end):
success, path = self.cspace.gen_path(start, end)
if path is None:
return False, start, path
safe_path = [path[0]]
for i in range(1, len(path)):
configuration = path[i]
# jacinle.log_function.print(f'Validate {i}/{len(path)}: {configuration} -> {self.validate_config(configuration)}')
if not self.validate_config(configuration):
return False, safe_path[-1], safe_path
safe_path.append(configuration)
return success, safe_path[-1], safe_path
[docs]
class CollisionFreeProblemSpace(ProblemSpace):
[docs]
def __init__(self, cspace, collide_fn=None):
super().__init__(cspace)
self.collide_fn = collide_fn
[docs]
def validate_config(self, configuration):
return self.cspace.validate_config(configuration) and not self.collide(configuration)
[docs]
def collide(self, configuration):
if self.collide_fn is not None:
return self.collide_fn(configuration)
raise NotImplementedError()