从轨迹抖动到安全指标:手把手拆解一个自动驾驶决策模块的代码实现(附Python伪代码)
·
从轨迹抖动到安全指标:手把手拆解一个自动驾驶决策模块的代码实现
自动驾驶技术正在重塑未来交通的图景,而决策模块作为系统的"大脑",直接决定了车辆如何在复杂环境中做出安全、舒适的行驶选择。本文将带您深入一个简化版的纵向决策模块实现,通过Python伪代码演示轨迹处理、避障逻辑与安全指标计算的全过程。无论您是希望理解自动驾驶底层逻辑的开发者,还是正在探索智能交通领域的学生,都能从这段代码之旅中获得实操性启发。
1. 自动驾驶决策模块的核心架构
自动驾驶决策模块通常由环境感知、行为预测、轨迹规划和车辆控制等子系统构成。纵向决策专注于车辆前进方向的速度与距离控制,需要实时处理以下关键数据流:
- 轨迹信息 :未来5-10秒的预测路径,包含位置、速度和方向
- 障碍物映射 :通过传感器识别周围车辆、行人等动态物体
- 道路拓扑 :车道线、交通标志等静态环境特征
- 安全参数 :与障碍物保持的最小距离阈值
class DecisionModule:
def __init__(self):
self.trajectory = [] # 存储规划轨迹
self.obstacles = [] # 障碍物列表
self.safety_params = {
'hard_padding': 1.5, # 安全距离(m)
'soft_padding': 3.0 # 舒适距离(m)
}
2. 轨迹处理与抖动抑制
轨迹抖动(Trajectory Flicker)是指规划路径在连续时间步中出现不合理的突变现象。这种现象可能由传感器噪声、算法不稳定或环境突变引起。我们的代码需要实现轨迹平滑和合理性校验:
def smooth_trajectory(self, raw_trajectory):
"""
使用滑动窗口平均滤波处理原始轨迹
:param raw_trajectory: 原始轨迹点列表
:return: 平滑后的轨迹
"""
window_size = 5
smoothed = []
for i in range(len(raw_trajectory)):
start = max(0, i - window_size//2)
end = min(len(raw_trajectory), i + window_size//2 + 1)
window = raw_trajectory[start:end]
avg_x = sum(p[0] for p in window) / len(window)
avg_y = sum(p[1] for p in window) / len(window)
smoothed.append((avg_x, avg_y))
# 曲率连续性检查
for i in range(1, len(smoothed)-1):
pinch = self.calculate_pinch(smoothed[i-1], smoothed[i], smoothed[i+1])
if abs(pinch) > 0.15: # 曲率变化阈值
return self.adjust_trajectory(smoothed, i)
return smoothed
轨迹质量评估指标:
| 指标名称 | 计算公式 | 允许阈值 | 物理意义 |
|---|---|---|---|
| 曲率(Pinch) | Δk/Δs | <0.15 | 轨迹弯曲程度变化率 |
| 急变度(Juke) | Δ(Pinch)/Δs | <0.05 | 曲率变化的加速度 |
| 横向加速度 | v²×k | <2.5 m/s² | 乘客舒适度影响 |
3. 避障决策与Nudge策略实现
当检测到前方障碍物时,系统需要在不换道的情况下进行道内避障(Nudge)。这涉及到几何padding的生成和碰撞检测:
def generate_padding(self, obstacle):
"""
为障碍物生成安全区域padding
:param obstacle: 包含位置、速度、尺寸的障碍物对象
:return: 安全区域多边形
"""
# 根据相对速度动态调整padding大小
rel_speed = self.ego_speed - obstacle.speed
dynamic_factor = max(1.0, min(2.0, 1.0 + rel_speed * 0.1))
hard_padding = self.safety_params['hard_padding'] * dynamic_factor
soft_padding = self.safety_params['soft_padding'] * dynamic_factor
return [
(obstacle.x - hard_padding, obstacle.y - hard_padding),
(obstacle.x + hard_padding, obstacle.y - hard_padding),
(obstacle.x + hard_padding, obstacle.y + hard_padding),
(obstacle.x - hard_padding, obstacle.y + hard_padding)
]
def nudge_decision(self, obstacle):
"""
执行道内避障决策
:param obstacle: 需要避开的障碍物
:return: 横向偏移量
"""
padding = self.generate_padding(obstacle)
if self.check_collision(self.trajectory, padding):
# 计算最优偏移方向
left_space = self.lane_width/2 - obstacle.y
right_space = self.lane_width/2 + obstacle.y
nudge = left_space if left_space > right_space else -right_space
return nudge * 0.8 # 保留20%余量
return 0.0
避障策略优先级对照表:
| 策略类型 | 触发条件 | 横向偏移 | 速度调整 | 适用场景 |
|---|---|---|---|---|
| 硬制动(Hard Stop) | 碰撞时间<2s | 无 | 最大减速度 | 紧急情况 |
| 道内避障(Nudge) | 碰撞时间2-5s | 有限偏移 | 适度减速 | 静态障碍物 |
| 换道避障(Lane Change) | 前方持续阻塞 | 全车道切换 | 保持速度 | 低速跟车 |
| 借道避障(Cross Nudge) | 对向车道空闲 | 部分侵入邻道 | 适度减速 | 临时避让 |
4. 安全指标计算与MPI监控
Miles Per Intervention(MPI)是衡量自动驾驶系统可靠性的黄金指标。我们需要在代码中实现完整的干预记录和指标计算模块:
class SafetyMetrics:
def __init__(self):
self.total_miles = 0.0
self.interventions = {
'critical': 0,
'system_fault': 0,
'malfunction': 0,
'unsupported': 0,
'experience': 0,
'remote': 0
}
def record_intervention(self, intervention_type):
if intervention_type in self.interventions:
self.interventions[intervention_type] += 1
def calculate_metrics(self):
metrics = {}
total_interventions = sum(self.interventions.values())
metrics['MPI'] = self.total_miles / total_interventions if total_interventions > 0 else float('inf')
metrics['MPCI'] = (self.total_miles / self.interventions['critical']
if self.interventions['critical'] > 0 else float('inf'))
# 其他指标计算类似...
return metrics
def update_mileage(self, delta_miles):
"""
更新自动驾驶里程
:param delta_miles: 自上次更新后的行驶里程(英里)
"""
self.total_miles += delta_miles
关键安全指标解释:
- MPI (Miles Per Intervention) :反映系统整体可靠性,数值越高越好
- MPCI (Miles Per Critical Intervention) :专门针对可能引发事故的关键场景
- MPFI (Miles Per System Fault Intervention) :衡量硬件/软件系统稳定性
- MPEI (Miles Per Experience Intervention) :评估算法处理非常规场景的能力
5. 决策状态机与主控制循环
自动驾驶决策本质上是基于状态机的模式切换。下面展示简化版的纵向决策状态机实现:
class LongitudinalFSM:
STATES = ['CRUISE', 'FOLLOW', 'STOP', 'YIELD', 'OVERTAKE']
def __init__(self):
self.current_state = 'CRUISE'
self.transition_table = {
'CRUISE': self._check_cruise_transitions,
'FOLLOW': self._check_follow_transitions,
# 其他状态转换检查...
}
def update_state(self, perception_data):
transition_check = self.transition_table[self.current_state]
new_state = transition_check(perception_data)
if new_state != self.current_state:
print(f"状态转换: {self.current_state} → {new_state}")
self.current_state = new_state
def _check_cruise_transitions(self, data):
if data.front_vehicle and data.time_to_collision < 5.0:
return 'FOLLOW'
if data.traffic_light == 'RED' and data.distance_to_stopline < 50.0:
return 'STOP'
return 'CRUISE'
# 其他状态转换条件检查...
主控制循环将上述所有组件整合在一起,形成完整的决策流程:
def main_control_loop():
# 初始化所有模块
perception = PerceptionModule()
decision = DecisionModule()
metrics = SafetyMetrics()
fsm = LongitudinalFSM()
while True:
# 1. 获取环境感知数据
env_data = perception.get_environment_data()
# 2. 更新安全指标里程
metrics.update_mileage(env_data.delta_miles)
# 3. 处理轨迹并检查抖动
smoothed_traj = decision.smooth_trajectory(env_data.raw_trajectory)
# 4. 执行避障决策
for obstacle in env_data.obstacles:
nudge = decision.nudge_decision(obstacle)
if nudge != 0.0:
smoothed_traj = decision.apply_nudge(smoothed_traj, nudge)
# 5. 更新状态机
fsm.update_state(env_data)
# 6. 生成控制命令
control_cmd = generate_control_command(
fsm.current_state,
smoothed_traj,
env_data
)
# 7. 发送控制指令
send_to_actuators(control_cmd)
在实际项目中测试这段代码时,发现轨迹平滑算法的窗口大小需要根据车速动态调整——高速时需要更大的窗口来保证稳定性,而低速时则需要小窗口来保持灵活性。这提醒我们,自动驾驶算法中的参数很少是固定不变的,都需要根据实际场景动态优化。
更多推荐


所有评论(0)