Utopi
a

自主车辆在匝道合并区的碰撞风险评估改进方法

风险评估使用的方法和指标以及变量解释

Muhammad Sameer Sheikh和Yinqiao Peng提出的碰撞风险评估模型采用了严格的方法论框架,以评估和缓解自主车辆(AVs)在匝道合并场景中可能遇到的碰撞风险。此方法论基于集成威胁评估模型的决策系统,通过计算最优侧向和纵向加速度制定安全车辆运动。以下是模型中涉及的公式、关键指标以及变量的详细解释:

关键技术和变量解释

  1. 威胁评估模型:评估基于车辆动态和预测轨迹的碰撞风险。这是模型的核心部分,用于实时分析可能发生的冲突,并为之后的决策提供依据。

  2. 决策制定:根据威胁评估,模型为车辆计算安全的侧向和纵向加速度,避免在合并时的潜在碰撞。

公式及变量详解

多项式方程的路径规划:

y=bTx+cy = b^Tx + c

  • yy:车辆的侧向位置,表示车辆与道路中心线的距离。
  • xx:车辆的纵向位置,即车辆沿道路前进的距离。
  • bTb^T:五阶多项式的系数向量,控制车辆路径的形状。
  • cc:常数项向量,用于调整路径方程。

避免碰撞的侧向加速度:

ai=vxγ+vy+Ga_i = v_x \gamma + v_y + G

  • aia_i:侧向加速度,指车辆为避免碰撞而必须达到的侧向运动加速。
  • γ\gamma:偏航率,车辆旋转的速率,即车辆的方向改变速度。
  • vxv_x:车辆的纵向速度,沿道路方向的速度。
  • vyv_y:车辆的侧向速度,向左或向右移动的速度。
  • GG:重力加速度,一般取9.8 m/s^2,影响车辆运动的地球重力。

最小偏航率:

γmin=vxPbP2a+P2b+x2nwc2\gamma_{min}= \frac{v_x}{P_b} \sqrt{P_2^a + P_2^b + x^2n - \frac{w_c}{2}}

  • γmin\gamma_{min}:最小偏航率,指为避免与主车道车辆(MLV)碰撞而需达成的最小车辆转向速率。
  • PaP_aPbP_b:分别表示车辆间的纵向和横向距离。
  • wcw_c:车辆的宽度,确保换道动作中有足够空间避免碰撞。

碰撞风险函数:

F=(xORVxMLVxOV)aS+(yORVyMLVyOV)bSF = \frac{(x_{ORV} - x_{MLV} - x_{OV})}{a_S} + \frac{(y_{ORV} - y_{MLV} - y_{OV})}{b_S}

  • FF:碰撞风险函数,用于评估匝道车辆(ORV)、主车道车辆(MLV)和其他车辆(OV)的相对位置风险。
  • xORV,yORVx_{ORV}, y_{ORV}:匝道车辆的纵向和侧向位置。
  • xMLV,yMLV,xOV,yOVx_{MLV}, y_{MLV}, x_{OV}, y_{OV}:主车道车辆和其他车辆的纵向和侧向位置。
  • aS,bSa_S, b_S:安全椭圆的长轴和短轴,用于定义车辆间的安全距离。

碰撞避免函数:

0 & \text{如果 } F \leq 1 \\ \text{存在碰撞风险} & \text{如果 } F(P_a, P_b) > 1 \\ \text{可以避免碰撞} & \text{否则} \end{cases}$$ - $\Phi$:碰撞威胁评估参数,基于车辆当前状态和动态变化确定安全性。 通过这一系列的方法论和计算,该碰撞风险评估模型为自主车辆提供了一种在匝道合并区域内安全行驶并避免碰撞的有效策略,进一步增强了交通安全。 递增代价(Progressive Increasing Cost,简称PIC)是在 "RiskBench: A Scenario-based Benchmark for Risk Identification" 论文中提出的一个新指标,用于评估风险识别算法在预测风险方面的能力。PIC特别设计来度量模型在不同时间点预测风险的准确性,尤其是当风险和自动驾驶车辆越来越接近时,准确预测风险的重要性会逐渐增加。 ## OrnsteinUhlenbeckNoise 类 OrnsteinUhlenbeckNoise 类用于生成时间连续的噪声序列,主要用于强化学习中的动作探索(exploration)机制,特别适用于连续动作空间。这种噪声模型有助于增加时间序列的相关性,使得探索过程更接近实际物理系统中的噪声行为。 ```python class OrnsteinUhlenbeckNoise: def init(self, mu): self.theta, self.dt, self.sigma = 0.1, 0.01, 0.1 self.mu = mu self.x_prev = np.zeros_like(self.mu) def __call__(self): x = ( self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape) ) self.x_prev = x return x ``` 在SMBPO(有安全约束的模型预先训练及探索方法)中,提供的代码片段展示了 `OrnsteinUhlenbeckNoise` 类和 `_create_buffer` 方法的作用。我们来详细解释这两个部分在代码中的功能: ### OrnsteinUhlenbeckNoise 类 `OrnsteinUhlenbeckNoise` 类用于生成时间连续的噪声序列,主要用于强化学习中的动作探索(exploration)机制,特别适用于连续动作空间。这种噪声模型有助于增加时间序列的相关性,使得探索过程更接近实际物理系统中的噪声行为。 1. **初始化 `__init__` 方法**: - `theta`、`dt`、`sigma`:分别是噪声的参数,控制回归速度、时间步长以及噪声的波动强度。 - `mu`:噪声的均值(通常为零矢量)。 - `x_prev`:用于存储上一时间步的噪声值,初始化为与均值形状相同的零矢量。 2. **\_\_call\_\_ 方法**: - 实现了 Ornstein-Uhlenbeck 过程的递归公式: $$ x_t = x_{t-1} + \theta (\mu - x_{t-1}) \Delta t + \sigma \sqrt{\Delta t} \mathcal{N}(0, 1) $$ - 每次调用该方法都会生成一个新的噪声值,并更新 `x_prev` 为当前噪声值 `x`,保证连续时间相关性。 **奥恩斯坦-乌伦贝克过程**: 强化学习中Ornstein-Uhlenbeck噪声是鸡肋吗?https://zhuanlan.zhihu.com/p/96720878 相比于独立噪声,OU噪声适合于惯性系统,尤其是**时间离散化粒度较小**的情况,此外,它可以保护实际系统,如机械臂。之所以大家觉得OU噪声没用,是因为强化学习benchmark都是仿真任务,而且往往都不用考虑时间离散化的问题。另外,即使遇到时间离散粒度小的问题,一般直接增大离散化粒度之后独立噪声也可以起到很好的效果。 **两种情况:(1)惯性系统(环境)(2)保护实际机械臂/机器人** ## SampleBuffer `SampleBuffer` 的类用于在深度强化学习或其他机器学习场景下管理经验回放缓冲区。经验回放缓冲区用来存储智能体与环境交互过程中的经验样本,便于后续训练使用。下面是对这段代码详细的解释: ### 类定义及初始化 ```python class SampleBuffer(Module): COMPONENT_NAMES = ('states', 'actions', 'next_states', 'rewards', 'dones') def __init__(self, state_dim, action_dim, capacity, discrete_actions=False, device=device): super().__init__() self.state_dim = state_dim self.action_dim = action_dim self.capacity = capacity self.discrete_actions = discrete_actions self.device = device self._bufs = {} self.register_buffer('_pointer', torch.tensor(0, dtype=torch.long)) ... ``` - `SampleBuffer` 继承自 `torch_util` 中的 `Module`。 - `COMPONENT_NAMES` 列表定义了经验样本的各个组成部分:状态、动作、下一个状态、奖励和终止标志。 - `__init__` 方法初始化缓冲区的维度、容量、是否采取离散动作以及设备。 ### 缓冲区的创建 ```python if discrete_actions: assert action_dim == 1 action_dtype = torch.int action_shape = [] else: action_dtype = torch.float action_shape = [action_dim] components = ( ('states', torch.float, [state_dim]), ('actions', action_dtype, action_shape), ('next_states', torch.float, [state_dim]), ('rewards', torch.float, []), ('dones', torch.bool, []) ) for name, dtype, shape in components: self._create_buffer(name, dtype, shape) ``` - 根据 `discrete_actions` 确定动作数据类型和形状。 - `components` 定义了各个组成部分的名称、数据类型和形状。 - 调用 `_create_buffer` 方法创建缓冲区。 ### 创建缓冲区及相关方法 ```python def _create_buffer(self, name, dtype, shape): assert name not in self._bufs _name = f'_{name}' buffer_shape = [self.capacity, *shape] buffer = torch.empty(*buffer_shape, dtype=dtype, device=self.device) self.register_buffer(_name, buffer) self._bufs[name] = buffer ``` - `_create_buffer` 方法根据提供的名称、数据类型和形状创建一个缓冲区。 ### 数据存取 ```python def append(self, **kwargs): assert set(kwargs.keys()) == set(self.COMPONENT_NAMES) i = self._pointer % self.capacity for name in self.COMPONENT_NAMES: self._bufs[name][i] = kwargs[name] self._pointer += 1 def extend(self, **kwargs): assert set(kwargs.keys()) == set(self.COMPONENT_NAMES) batch_size = len(list(kwargs.values())[0]) assert batch_size <= self.capacity, 'We do not support extending by more than buffer capacity' i = self._pointer % self.capacity end = i + batch_size if end <= self.capacity: for name in self.COMPONENT_NAMES: self._bufs[name][i:end] = kwargs[name] else: fit = self.capacity - i overflow = end - self.capacity for name in self.COMPONENT_NAMES: buf, arg = self._bufs[name], kwargs[name] buf[-fit:] = arg[:fit] buf[:overflow] = arg[-overflow:] self._pointer += batch_size ``` - `append` 方法添加单一经验样本到缓冲区。 - `extend` 方法批量添加经验样本。 ### 数据检索 ```python def _get1(self, name): buf = self._bufs[name] if self._pointer <= self.capacity: return buf[:self._pointer] else: i = self._pointer % self.capacity return torch.cat([buf[i:], buf[:i]]) def get(self, *names, device=device, as_dict=False): if len(names) == 0: names = self.COMPONENT_NAMES bufs = [self._get1(name).to(device) for name in names] if as_dict: return dict(zip(names, bufs)) else: return bufs if len(bufs) > 1 else bufs[0] ``` - `_get1` 方法检索缓冲区中的一个部分,支持环形缓冲器。 - `get` 方法根据指定的名称获取多个或所有组件的数据,可选返回字典格式。 ### 样本提取 ```python def sample(self, batch_size, replace=True, device=device, include_indices=False): indices = torch.randint(len(self), [batch_size], device=device) if replace else \ random_indices(len(self), size=batch_size, replace=False) bufs = [self._bufs[name][indices].to(device) for name in self.COMPONENT_NAMES] return (bufs, indices) if include_indices else bufs ``` - `sample` 方法根据指定的批量大小随机抽样,支持替换抽样或不替换抽样,可选返回索引。 ### 类方法 ```python @classmethod def from_state_dict(cls, state_dict, device=device): assert set(state_dict.keys()) == {*(f'_{name}' for name in cls.COMPONENT_NAMES), '_pointer'} states, actions = state_dict['_states'], state_dict['_actions'] l = len(states) for name in cls.COMPONENT_NAMES: tensor = state_dict[f'_{name}'] assert torch.is_tensor(tensor) assert len(tensor) == l buffer = cls(state_dim=states.shape[1], action_dim=actions.shape[1], capacity=l, discrete_actions=(not actions.dtype.is_floating_point), device=device) buffer.load_state_dict(state_dict) return buffer ``` - `from_state_dict` 方法通过状态字典创建 `SampleBuffer` 实例用于序列化或反序列化。 总体来说,这段代码实现了一个高效的环形经验回放缓冲区,支持随机采样、单条或批量插入,并且能够在 PyTorch 的设备间切换。