Python实战:用Dijkstra与蚁群算法实现无人机协同避障路径规划

当两架无人机需要在障碍物环境中协同飞行时,路径规划算法面临的核心挑战在于如何平衡效率与安全性。本文将带您从零开始,用Python实现两种经典算法——Dijkstra和蚁群算法,解决这个典型的数学建模问题。不同于单纯的理论讲解,我们会聚焦于 代码实现细节 可视化分析 ,让抽象的算法变得触手可及。

1. 问题建模与基础准备

1.1 问题场景解析

我们面对的是一个典型的协同路径规划问题:两架无人机分别从A站(距圆心1km)和B站(距圆心3.5km)同时出发,以10m/s的速度相向飞行。核心约束条件包括:

  • 必须避开中央的圆形障碍物(半径500m)
  • 两机飞行轨迹不得相交(连线必须与障碍圆相交)
  • 最小转弯半径限制为30米
import numpy as np
import matplotlib.pyplot as plt

# 环境参数初始化
obstacle_radius = 500  # 障碍圆半径(m)
A_pos = np.array([-1000, 0])  # A站坐标
B_pos = np.array([3500, 0])   # B站坐标
min_turn_radius = 30  # 最小转弯半径(m)
drone_speed = 10  # 无人机速度(m/s)

1.2 离散化环境建模

为了应用图搜索算法,我们需要将连续空间离散化为图结构。这里采用极坐标网格划分方法:

def create_discrete_space(r_max=4000, r_step=200, theta_step=15):
    """创建极坐标离散点阵"""
    radii = np.arange(0, r_max + r_step, r_step)
    angles = np.deg2rad(np.arange(0, 360, theta_step))
    
    points = []
    for r in radii[1:]:  # 忽略r=0的中心点
        for theta in angles:
            x = r * np.cos(theta)
            y = r * np.sin(theta)
            # 过滤障碍圆内的点
            if np.sqrt(x**2 + y**2) > obstacle_radius:
                points.append((x, y))
    return np.array(points)

space_points = create_discrete_space()

1.3 可视化基础环境

在算法实现前,先建立直观的环境认知:

def plot_environment():
    fig, ax = plt.subplots(figsize=(10, 6))
    obstacle = plt.Circle((0, 0), obstacle_radius, color='gray', alpha=0.3)
    ax.add_patch(obstacle)
    ax.plot(A_pos[0], A_pos[1], 'ro', markersize=10, label='A站')
    ax.plot(B_pos[0], B_pos[1], 'bo', markersize=10, label='B站')
    ax.scatter(space_points[:,0], space_points[:,1], s=5, c='green', alpha=0.3)
    ax.set_aspect('equal')
    ax.legend()
    plt.grid(True)
    plt.title('无人机协同避障环境建模')
    plt.show()

plot_environment()

2. Dijkstra算法实现与优化

2.1 经典Dijkstra实现

Dijkstra算法适合解决单源最短路径问题。我们需要构建邻接矩阵并实现优先级队列:

import heapq

def dijkstra(graph, start, end):
    """Dijkstra最短路径算法实现"""
    queue = []
    heapq.heappush(queue, (0, start))
    visited = set()
    prev = {start: None}
    dist = {start: 0}
    
    while queue:
        current_dist, current_node = heapq.heappop(queue)
        if current_node in visited:
            continue
        visited.add(current_node)
        
        if current_node == end:
            break
            
        for neighbor, weight in graph[current_node].items():
            if neighbor in visited:
                continue
            new_dist = current_dist + weight
            if neighbor not in dist or new_dist < dist[neighbor]:
                dist[neighbor] = new_dist
                prev[neighbor] = current_node
                heapq.heappush(queue, (new_dist, neighbor))
    
    # 回溯路径
    path = []
    node = end
    while node is not None:
        path.append(node)
        node = prev.get(node, None)
    return path[::-1], dist.get(end, float('inf'))

2.2 图构建与约束处理

关键是将物理约束转化为图连接规则:

def build_graph(points, max_turn_angle=45):
    """构建考虑转弯半径约束的图结构"""
    graph = {i: {} for i in range(len(points))}
    n = len(points)
    
    for i in range(n):
        for j in range(i+1, n):
            p1, p2 = points[i], points[j]
            distance = np.linalg.norm(p2 - p1)
            
            # 转弯约束检查
            if i > 0:  # 不是起点
                prev_point = points[i-1] if i > 0 else p1
                vec1 = p1 - prev_point
                vec2 = p2 - p1
                angle = np.degrees(np.arccos(
                    np.dot(vec1, vec2)/(np.linalg.norm(vec1)*np.linalg.norm(vec2))))
                if angle > max_turn_angle:
                    continue
            
            # 障碍物碰撞检查
            if not is_collision_free(p1, p2):
                continue
                
            graph[i][j] = distance
            graph[j][i] = distance
            
    return graph

def is_collision_free(p1, p2, num_check=10):
    """线段障碍物碰撞检测"""
    for t in np.linspace(0, 1, num_check):
        point = p1 + t*(p2 - p1)
        if np.linalg.norm(point) <= obstacle_radius:
            return False
    return True

2.3 多机协同路径规划

为满足两机连线必须与障碍圆相交的条件,需要特殊处理:

def cooperative_path_planning(graph, points, start_A, start_B):
    """协同路径规划主函数"""
    # 第一轮独立规划
    path_A, _ = dijkstra(graph, start_A, start_B)
    path_B, _ = dijkstra(graph, start_B, start_A)
    
    # 协同约束检查与调整
    for i in range(min(len(path_A), len(path_B))):
        pos_A = points[path_A[i]]
        pos_B = points[path_B[i]]
        if not is_line_intersecting_circle(pos_A, pos_B):
            # 调整策略:优先保证约束满足
            path_A = adjust_path(path_A, points, path_B, i)
            break
            
    return path_A, path_B

def is_line_intersecting_circle(p1, p2):
    """判断两机连线是否与障碍圆相交"""
    a = np.linalg.norm(p2 - p1)**2
    b = 2 * np.dot(p1, p2 - p1)
    c = np.linalg.norm(p1)**2 - obstacle_radius**2
    discriminant = b**2 - 4*a*c
    return discriminant >= 0

3. 蚁群算法实现与调优

3.1 蚁群算法框架搭建

蚁群算法通过模拟蚂蚁觅食行为解决组合优化问题:

class AntColony:
    def __init__(self, points, n_ants=20, n_iterations=100, 
                 alpha=1, beta=3, evaporation=0.5, Q=100):
        self.points = points
        self.n_ants = n_ants
        self.n_iterations = n_iterations
        self.alpha = alpha  # 信息素重要程度
        self.beta = beta    # 启发式信息重要程度
        self.evaporation = evaporation  # 信息素挥发率
        self.Q = Q          # 信息素常数
        self.pheromone = np.ones((len(points), len(points)))
        self.heuristic = 1 / (np.sqrt(np.sum((points[:, None] - points) ** 2, axis=2)) + 1e-6)
        
    def run(self):
        best_path = None
        best_length = float('inf')
        
        for _ in range(self.n_iterations):
            paths = self.generate_paths()
            self.update_pheromone(paths)
            
            current_best_path = min(paths, key=lambda x: x[1])
            if current_best_path[1] < best_length:
                best_path, best_length = current_best_path
                
        return best_path, best_length
    
    def generate_paths(self):
        paths = []
        for _ in range(self.n_ants):
            path, length = self.construct_path()
            paths.append((path, length))
        return paths
    
    def construct_path(self):
        visited = set()
        path = []
        current = np.random.randint(len(self.points))
        path.append(current)
        visited.add(current)
        
        while len(visited) < len(self.points):
            next_node = self.select_next(current, visited)
            path.append(next_node)
            visited.add(next_node)
            current = next_node
            
        total_length = self.calculate_path_length(path)
        return path, total_length
    
    def select_next(self, current, visited):
        unvisited = [i for i in range(len(self.points)) if i not in visited]
        probabilities = self.calculate_probabilities(current, unvisited)
        return np.random.choice(unvisited, p=probabilities)
    
    def calculate_probabilities(self, current, unvisited):
        pheromone = self.pheromone[current, unvisited] ** self.alpha
        heuristic = self.heuristic[current, unvisited] ** self.beta
        total = np.sum(pheromone * heuristic)
        return (pheromone * heuristic) / total
    
    def update_pheromone(self, paths):
        self.pheromone *= self.evaporation
        for path, length in paths:
            contribution = self.Q / length
            for i in range(len(path)-1):
                self.pheromone[path[i], path[i+1]] += contribution
                self.pheromone[path[i+1], path[i]] += contribution

3.2 协同约束的特殊处理

针对无人机协同约束修改蚁群算法:

class CooperativeAntColony(AntColony):
    def __init__(self, points, **kwargs):
        super().__init__(points, **kwargs)
        self.cooperative_pheromone = np.ones((len(points), len(points)))
        
    def cooperative_update(self, path_A, path_B):
        """更新协同信息素矩阵"""
        min_len = min(len(path_A), len(path_B))
        for i in range(min_len - 1):
            pos_A1 = self.points[path_A[i]]
            pos_A2 = self.points[path_A[i+1]]
            pos_B1 = self.points[path_B[i]]
            pos_B2 = self.points[path_B[i+1]]
            
            # 计算两机路径段的协同度
            cooperation_score = self.calculate_cooperation(pos_A1, pos_A2, pos_B1, pos_B2)
            self.cooperative_pheromone[path_A[i], path_A[i+1]] += cooperation_score
            self.cooperative_pheromone[path_B[i], path_B[i+1]] += cooperation_score
            
    def calculate_cooperation(self, A1, A2, B1, B2):
        """评估路径段协同程度"""
        mid_A = (A1 + A2) / 2
        mid_B = (B1 + B2) / 2
        return 1 if is_line_intersecting_circle(mid_A, mid_B) else -1
    
    def select_next(self, current, visited, other_path=None):
        """改进的选择策略考虑协同约束"""
        unvisited = [i for i in range(len(self.points)) if i not in visited]
        probabilities = self.calculate_coop_probabilities(current, unvisited, other_path)
        return np.random.choice(unvisited, p=probabilities)
    
    def calculate_coop_probabilities(self, current, unvisited, other_path=None):
        base_prob = super().calculate_probabilities(current, unvisited)
        if other_path is None:
            return base_prob
            
        # 结合协同信息素调整概率
        coop_factor = self.cooperative_pheromone[current, unvisited]
        coop_factor = (coop_factor - np.min(coop_factor)) / (np.max(coop_factor) - np.min(coop_factor) + 1e-6)
        adjusted_prob = base_prob * (0.5 + 0.5 * coop_factor)
        return adjusted_prob / np.sum(adjusted_prob)

3.3 参数调优实战

蚁群算法性能高度依赖参数设置,以下是调优经验:

参数 推荐范围 影响效果 调优建议
α (alpha) 0.5-2 信息素重要性 值越大,蚂蚁越倾向于跟随信息素轨迹
β (beta) 2-5 启发式信息重要性 值越大,蚂蚁越倾向于选择近距离节点
蒸发率 0.3-0.7 信息素挥发速度 值越小,历史信息保留越多
Q常数 50-200 信息素沉积量 影响信息素更新幅度
蚂蚁数量 10-50 并行搜索能力 数量越多,搜索越全面,但计算成本越高
def parameter_tuning_experiment():
    """参数调优实验示例"""
    param_grid = {
        'alpha': [0.5, 1, 1.5],
        'beta': [2, 3, 4],
        'evaporation': [0.3, 0.5, 0.7],
        'Q': [50, 100, 200],
        'n_ants': [10, 20, 30]
    }
    
    best_params = None
    best_score = float('inf')
    
    # 简化的网格搜索(实际应使用更高效的方法)
    for alpha in param_grid['alpha']:
        for beta in param_grid['beta']:
            colony = AntColony(space_points, alpha=alpha, beta=beta)
            path, length = colony.run()
            if length < best_score:
                best_score = length
                best_params = {'alpha': alpha, 'beta': beta}
    
    print(f"最佳参数组合: {best_params}, 路径长度: {best_score:.2f}")
    return best_params

4. 算法对比与结果分析

4.1 性能指标对比

我们设计了多维度的评估体系:

def evaluate_algorithm(algorithm, *args, **kwargs):
    """算法评估框架"""
    start_time = time.time()
    result = algorithm(*args, **kwargs)
    computation_time = time.time() - start_time
    
    if isinstance(result, tuple) and len(result) == 2:
        path, length = result
    else:
        path = result
        length = calculate_path_length(path)
    
    smoothness = calculate_path_smoothness(path)
    constraints_violation = check_constraints_violation(path)
    
    return {
        'path': path,
        'length': length,
        'computation_time': computation_time,
        'smoothness': smoothness,
        'constraints_violation': constraints_violation
    }

def compare_algorithms():
    """算法对比实验"""
    # 准备测试用例
    test_cases = [
        {'start': 0, 'end': -1},  # A到B的典型路径
        {'start': 10, 'end': 50},  # 中等距离路径
        {'start': 30, 'end': 80}   # 复杂环境路径
    ]
    
    # 算法列表
    algorithms = [
        ('Dijkstra', dijkstra_wrapper),
        ('A*', astar_wrapper),
        ('Ant Colony', ant_colony_wrapper)
    ]
    
    # 执行对比
    results = {}
    for name, func in algorithms:
        case_results = []
        for case in test_cases:
            eval_result = evaluate_algorithm(func, **case)
            case_results.append(eval_result)
        results[name] = case_results
    
    # 输出对比表格
    print("\n算法性能对比:")
    print("| 算法名称 | 平均路径长度 | 平均计算时间 | 平均平滑度 | 约束违反次数 |")
    print("|----------|--------------|--------------|------------|--------------|")
    for name, case_results in results.items():
        avg_length = np.mean([r['length'] for r in case_results])
        avg_time = np.mean([r['computation_time'] for r in case_results])
        avg_smooth = np.mean([r['smoothness'] for r in case_results])
        avg_violation = np.mean([r['constraints_violation'] for r in case_results])
        print(f"| {name:8} | {avg_length:12.2f} | {avg_time:12.4f} | {avg_smooth:10.2f} | {avg_violation:12} |")

4.2 可视化分析

路径可视化能直观展示算法差异:

def plot_algorithm_comparison(results):
    """算法对比可视化"""
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    case_names = ['简单场景', '中等场景', '复杂场景']
    
    for i, case_name in enumerate(case_names):
        ax = axes[i]
        # 绘制环境
        obstacle = plt.Circle((0, 0), obstacle_radius, color='gray', alpha=0.3)
        ax.add_patch(obstacle)
        ax.plot(A_pos[0], A_pos[1], 'ro', markersize=10)
        ax.plot(B_pos[0], B_pos[1], 'bo', markersize=10)
        
        # 绘制各算法路径
        for algo_name, algo_results in results.items():
            path_indices = algo_results[i]['path']
            path_points = space_points[path_indices]
            ax.plot(path_points[:,0], path_points[:,1], '.-', 
                   label=f'{algo_name} ({algo_results[i]["length"]:.1f}m)')
        
        ax.set_title(case_name)
        ax.set_aspect('equal')
        ax.grid(True)
        if i == 0:
            ax.legend()
    
    plt.suptitle('不同算法路径规划效果对比')
    plt.tight_layout()
    plt.show()

4.3 实际应用建议

根据实验结果,我们得出以下实用建议:

  • Dijkstra算法适用场景

    • 当环境复杂度不高时(离散点少于500个)
    • 需要保证找到最优解的场景
    • 计算资源充足的情况下
  • 蚁群算法优势场景

    • 大规模复杂环境(离散点多于1000个)
    • 需要满足多个复杂约束条件
    • 可以接受近似最优解以换取更快速度

混合策略实践 :在实际项目中,可以先用Dijkstra算法在小规模问题上确定基准解,然后用蚁群算法处理大规模问题,并将Dijkstra结果作为初始信息素分布。

更多推荐