从轨迹抖动到安全指标:手把手拆解一个自动驾驶决策模块的代码实现

自动驾驶技术正在重塑未来交通的图景,而决策模块作为系统的"大脑",直接决定了车辆如何在复杂环境中做出安全、舒适的行驶选择。本文将带您深入一个简化版的纵向决策模块实现,通过Python伪代码演示轨迹处理、避障逻辑与安全指标计算的全过程。无论您是希望理解自动驾驶底层逻辑的开发者,还是正在探索智能交通领域的学生,都能从这段代码之旅中获得实操性启发。

1. 自动驾驶决策模块的核心架构

自动驾驶决策模块通常由环境感知、行为预测、轨迹规划和车辆控制等子系统构成。纵向决策专注于车辆前进方向的速度与距离控制,需要实时处理以下关键数据流:

  • 轨迹信息 :未来5-10秒的预测路径,包含位置、速度和方向
  • 障碍物映射 :通过传感器识别周围车辆、行人等动态物体
  • 道路拓扑 :车道线、交通标志等静态环境特征
  • 安全参数 :与障碍物保持的最小距离阈值
class DecisionModule:
    def __init__(self):
        self.trajectory = []  # 存储规划轨迹
        self.obstacles = []   # 障碍物列表
        self.safety_params = {
            'hard_padding': 1.5,  # 安全距离(m)
            'soft_padding': 3.0   # 舒适距离(m)
        }

2. 轨迹处理与抖动抑制

轨迹抖动(Trajectory Flicker)是指规划路径在连续时间步中出现不合理的突变现象。这种现象可能由传感器噪声、算法不稳定或环境突变引起。我们的代码需要实现轨迹平滑和合理性校验:

def smooth_trajectory(self, raw_trajectory):
    """
    使用滑动窗口平均滤波处理原始轨迹
    :param raw_trajectory: 原始轨迹点列表
    :return: 平滑后的轨迹
    """
    window_size = 5
    smoothed = []
    for i in range(len(raw_trajectory)):
        start = max(0, i - window_size//2)
        end = min(len(raw_trajectory), i + window_size//2 + 1)
        window = raw_trajectory[start:end]
        avg_x = sum(p[0] for p in window) / len(window)
        avg_y = sum(p[1] for p in window) / len(window)
        smoothed.append((avg_x, avg_y))
    
    # 曲率连续性检查
    for i in range(1, len(smoothed)-1):
        pinch = self.calculate_pinch(smoothed[i-1], smoothed[i], smoothed[i+1])
        if abs(pinch) > 0.15:  # 曲率变化阈值
            return self.adjust_trajectory(smoothed, i)
    
    return smoothed

轨迹质量评估指标:

指标名称 计算公式 允许阈值 物理意义
曲率(Pinch) Δk/Δs <0.15 轨迹弯曲程度变化率
急变度(Juke) Δ(Pinch)/Δs <0.05 曲率变化的加速度
横向加速度 v²×k <2.5 m/s² 乘客舒适度影响

3. 避障决策与Nudge策略实现

当检测到前方障碍物时,系统需要在不换道的情况下进行道内避障(Nudge)。这涉及到几何padding的生成和碰撞检测:

def generate_padding(self, obstacle):
    """
    为障碍物生成安全区域padding
    :param obstacle: 包含位置、速度、尺寸的障碍物对象
    :return: 安全区域多边形
    """
    # 根据相对速度动态调整padding大小
    rel_speed = self.ego_speed - obstacle.speed
    dynamic_factor = max(1.0, min(2.0, 1.0 + rel_speed * 0.1))
    
    hard_padding = self.safety_params['hard_padding'] * dynamic_factor
    soft_padding = self.safety_params['soft_padding'] * dynamic_factor
    
    return [
        (obstacle.x - hard_padding, obstacle.y - hard_padding),
        (obstacle.x + hard_padding, obstacle.y - hard_padding),
        (obstacle.x + hard_padding, obstacle.y + hard_padding),
        (obstacle.x - hard_padding, obstacle.y + hard_padding)
    ]

def nudge_decision(self, obstacle):
    """
    执行道内避障决策
    :param obstacle: 需要避开的障碍物
    :return: 横向偏移量
    """
    padding = self.generate_padding(obstacle)
    if self.check_collision(self.trajectory, padding):
        # 计算最优偏移方向
        left_space = self.lane_width/2 - obstacle.y
        right_space = self.lane_width/2 + obstacle.y
        nudge = left_space if left_space > right_space else -right_space
        return nudge * 0.8  # 保留20%余量
    return 0.0

避障策略优先级对照表:

策略类型 触发条件 横向偏移 速度调整 适用场景
硬制动(Hard Stop) 碰撞时间<2s 最大减速度 紧急情况
道内避障(Nudge) 碰撞时间2-5s 有限偏移 适度减速 静态障碍物
换道避障(Lane Change) 前方持续阻塞 全车道切换 保持速度 低速跟车
借道避障(Cross Nudge) 对向车道空闲 部分侵入邻道 适度减速 临时避让

4. 安全指标计算与MPI监控

Miles Per Intervention(MPI)是衡量自动驾驶系统可靠性的黄金指标。我们需要在代码中实现完整的干预记录和指标计算模块:

class SafetyMetrics:
    def __init__(self):
        self.total_miles = 0.0
        self.interventions = {
            'critical': 0,
            'system_fault': 0,
            'malfunction': 0,
            'unsupported': 0,
            'experience': 0,
            'remote': 0
        }
    
    def record_intervention(self, intervention_type):
        if intervention_type in self.interventions:
            self.interventions[intervention_type] += 1
    
    def calculate_metrics(self):
        metrics = {}
        total_interventions = sum(self.interventions.values())
        
        metrics['MPI'] = self.total_miles / total_interventions if total_interventions > 0 else float('inf')
        metrics['MPCI'] = (self.total_miles / self.interventions['critical'] 
                          if self.interventions['critical'] > 0 else float('inf'))
        # 其他指标计算类似...
        
        return metrics

    def update_mileage(self, delta_miles):
        """
        更新自动驾驶里程
        :param delta_miles: 自上次更新后的行驶里程(英里)
        """
        self.total_miles += delta_miles

关键安全指标解释:

  • MPI (Miles Per Intervention) :反映系统整体可靠性,数值越高越好
  • MPCI (Miles Per Critical Intervention) :专门针对可能引发事故的关键场景
  • MPFI (Miles Per System Fault Intervention) :衡量硬件/软件系统稳定性
  • MPEI (Miles Per Experience Intervention) :评估算法处理非常规场景的能力

5. 决策状态机与主控制循环

自动驾驶决策本质上是基于状态机的模式切换。下面展示简化版的纵向决策状态机实现:

class LongitudinalFSM:
    STATES = ['CRUISE', 'FOLLOW', 'STOP', 'YIELD', 'OVERTAKE']
    
    def __init__(self):
        self.current_state = 'CRUISE'
        self.transition_table = {
            'CRUISE': self._check_cruise_transitions,
            'FOLLOW': self._check_follow_transitions,
            # 其他状态转换检查...
        }
    
    def update_state(self, perception_data):
        transition_check = self.transition_table[self.current_state]
        new_state = transition_check(perception_data)
        if new_state != self.current_state:
            print(f"状态转换: {self.current_state} → {new_state}")
            self.current_state = new_state
    
    def _check_cruise_transitions(self, data):
        if data.front_vehicle and data.time_to_collision < 5.0:
            return 'FOLLOW'
        if data.traffic_light == 'RED' and data.distance_to_stopline < 50.0:
            return 'STOP'
        return 'CRUISE'
    
    # 其他状态转换条件检查...

主控制循环将上述所有组件整合在一起,形成完整的决策流程:

def main_control_loop():
    # 初始化所有模块
    perception = PerceptionModule()
    decision = DecisionModule()
    metrics = SafetyMetrics()
    fsm = LongitudinalFSM()
    
    while True:
        # 1. 获取环境感知数据
        env_data = perception.get_environment_data()
        
        # 2. 更新安全指标里程
        metrics.update_mileage(env_data.delta_miles)
        
        # 3. 处理轨迹并检查抖动
        smoothed_traj = decision.smooth_trajectory(env_data.raw_trajectory)
        
        # 4. 执行避障决策
        for obstacle in env_data.obstacles:
            nudge = decision.nudge_decision(obstacle)
            if nudge != 0.0:
                smoothed_traj = decision.apply_nudge(smoothed_traj, nudge)
        
        # 5. 更新状态机
        fsm.update_state(env_data)
        
        # 6. 生成控制命令
        control_cmd = generate_control_command(
            fsm.current_state, 
            smoothed_traj,
            env_data
        )
        
        # 7. 发送控制指令
        send_to_actuators(control_cmd)

在实际项目中测试这段代码时,发现轨迹平滑算法的窗口大小需要根据车速动态调整——高速时需要更大的窗口来保证稳定性,而低速时则需要小窗口来保持灵活性。这提醒我们,自动驾驶算法中的参数很少是固定不变的,都需要根据实际场景动态优化。

更多推荐