用Python复现数学建模赛题:手把手教你用Dijkstra和蚁群算法搞定无人机协同避障
·
Python实战:用Dijkstra与蚁群算法实现无人机协同避障路径规划
当两架无人机需要在障碍物环境中协同飞行时,路径规划算法面临的核心挑战在于如何平衡效率与安全性。本文将带您从零开始,用Python实现两种经典算法——Dijkstra和蚁群算法,解决这个典型的数学建模问题。不同于单纯的理论讲解,我们会聚焦于 代码实现细节 和 可视化分析 ,让抽象的算法变得触手可及。
1. 问题建模与基础准备
1.1 问题场景解析
我们面对的是一个典型的协同路径规划问题:两架无人机分别从A站(距圆心1km)和B站(距圆心3.5km)同时出发,以10m/s的速度相向飞行。核心约束条件包括:
- 必须避开中央的圆形障碍物(半径500m)
- 两机飞行轨迹不得相交(连线必须与障碍圆相交)
- 最小转弯半径限制为30米
import numpy as np
import matplotlib.pyplot as plt
# 环境参数初始化
obstacle_radius = 500 # 障碍圆半径(m)
A_pos = np.array([-1000, 0]) # A站坐标
B_pos = np.array([3500, 0]) # B站坐标
min_turn_radius = 30 # 最小转弯半径(m)
drone_speed = 10 # 无人机速度(m/s)
1.2 离散化环境建模
为了应用图搜索算法,我们需要将连续空间离散化为图结构。这里采用极坐标网格划分方法:
def create_discrete_space(r_max=4000, r_step=200, theta_step=15):
"""创建极坐标离散点阵"""
radii = np.arange(0, r_max + r_step, r_step)
angles = np.deg2rad(np.arange(0, 360, theta_step))
points = []
for r in radii[1:]: # 忽略r=0的中心点
for theta in angles:
x = r * np.cos(theta)
y = r * np.sin(theta)
# 过滤障碍圆内的点
if np.sqrt(x**2 + y**2) > obstacle_radius:
points.append((x, y))
return np.array(points)
space_points = create_discrete_space()
1.3 可视化基础环境
在算法实现前,先建立直观的环境认知:
def plot_environment():
fig, ax = plt.subplots(figsize=(10, 6))
obstacle = plt.Circle((0, 0), obstacle_radius, color='gray', alpha=0.3)
ax.add_patch(obstacle)
ax.plot(A_pos[0], A_pos[1], 'ro', markersize=10, label='A站')
ax.plot(B_pos[0], B_pos[1], 'bo', markersize=10, label='B站')
ax.scatter(space_points[:,0], space_points[:,1], s=5, c='green', alpha=0.3)
ax.set_aspect('equal')
ax.legend()
plt.grid(True)
plt.title('无人机协同避障环境建模')
plt.show()
plot_environment()
2. Dijkstra算法实现与优化
2.1 经典Dijkstra实现
Dijkstra算法适合解决单源最短路径问题。我们需要构建邻接矩阵并实现优先级队列:
import heapq
def dijkstra(graph, start, end):
"""Dijkstra最短路径算法实现"""
queue = []
heapq.heappush(queue, (0, start))
visited = set()
prev = {start: None}
dist = {start: 0}
while queue:
current_dist, current_node = heapq.heappop(queue)
if current_node in visited:
continue
visited.add(current_node)
if current_node == end:
break
for neighbor, weight in graph[current_node].items():
if neighbor in visited:
continue
new_dist = current_dist + weight
if neighbor not in dist or new_dist < dist[neighbor]:
dist[neighbor] = new_dist
prev[neighbor] = current_node
heapq.heappush(queue, (new_dist, neighbor))
# 回溯路径
path = []
node = end
while node is not None:
path.append(node)
node = prev.get(node, None)
return path[::-1], dist.get(end, float('inf'))
2.2 图构建与约束处理
关键是将物理约束转化为图连接规则:
def build_graph(points, max_turn_angle=45):
"""构建考虑转弯半径约束的图结构"""
graph = {i: {} for i in range(len(points))}
n = len(points)
for i in range(n):
for j in range(i+1, n):
p1, p2 = points[i], points[j]
distance = np.linalg.norm(p2 - p1)
# 转弯约束检查
if i > 0: # 不是起点
prev_point = points[i-1] if i > 0 else p1
vec1 = p1 - prev_point
vec2 = p2 - p1
angle = np.degrees(np.arccos(
np.dot(vec1, vec2)/(np.linalg.norm(vec1)*np.linalg.norm(vec2))))
if angle > max_turn_angle:
continue
# 障碍物碰撞检查
if not is_collision_free(p1, p2):
continue
graph[i][j] = distance
graph[j][i] = distance
return graph
def is_collision_free(p1, p2, num_check=10):
"""线段障碍物碰撞检测"""
for t in np.linspace(0, 1, num_check):
point = p1 + t*(p2 - p1)
if np.linalg.norm(point) <= obstacle_radius:
return False
return True
2.3 多机协同路径规划
为满足两机连线必须与障碍圆相交的条件,需要特殊处理:
def cooperative_path_planning(graph, points, start_A, start_B):
"""协同路径规划主函数"""
# 第一轮独立规划
path_A, _ = dijkstra(graph, start_A, start_B)
path_B, _ = dijkstra(graph, start_B, start_A)
# 协同约束检查与调整
for i in range(min(len(path_A), len(path_B))):
pos_A = points[path_A[i]]
pos_B = points[path_B[i]]
if not is_line_intersecting_circle(pos_A, pos_B):
# 调整策略:优先保证约束满足
path_A = adjust_path(path_A, points, path_B, i)
break
return path_A, path_B
def is_line_intersecting_circle(p1, p2):
"""判断两机连线是否与障碍圆相交"""
a = np.linalg.norm(p2 - p1)**2
b = 2 * np.dot(p1, p2 - p1)
c = np.linalg.norm(p1)**2 - obstacle_radius**2
discriminant = b**2 - 4*a*c
return discriminant >= 0
3. 蚁群算法实现与调优
3.1 蚁群算法框架搭建
蚁群算法通过模拟蚂蚁觅食行为解决组合优化问题:
class AntColony:
def __init__(self, points, n_ants=20, n_iterations=100,
alpha=1, beta=3, evaporation=0.5, Q=100):
self.points = points
self.n_ants = n_ants
self.n_iterations = n_iterations
self.alpha = alpha # 信息素重要程度
self.beta = beta # 启发式信息重要程度
self.evaporation = evaporation # 信息素挥发率
self.Q = Q # 信息素常数
self.pheromone = np.ones((len(points), len(points)))
self.heuristic = 1 / (np.sqrt(np.sum((points[:, None] - points) ** 2, axis=2)) + 1e-6)
def run(self):
best_path = None
best_length = float('inf')
for _ in range(self.n_iterations):
paths = self.generate_paths()
self.update_pheromone(paths)
current_best_path = min(paths, key=lambda x: x[1])
if current_best_path[1] < best_length:
best_path, best_length = current_best_path
return best_path, best_length
def generate_paths(self):
paths = []
for _ in range(self.n_ants):
path, length = self.construct_path()
paths.append((path, length))
return paths
def construct_path(self):
visited = set()
path = []
current = np.random.randint(len(self.points))
path.append(current)
visited.add(current)
while len(visited) < len(self.points):
next_node = self.select_next(current, visited)
path.append(next_node)
visited.add(next_node)
current = next_node
total_length = self.calculate_path_length(path)
return path, total_length
def select_next(self, current, visited):
unvisited = [i for i in range(len(self.points)) if i not in visited]
probabilities = self.calculate_probabilities(current, unvisited)
return np.random.choice(unvisited, p=probabilities)
def calculate_probabilities(self, current, unvisited):
pheromone = self.pheromone[current, unvisited] ** self.alpha
heuristic = self.heuristic[current, unvisited] ** self.beta
total = np.sum(pheromone * heuristic)
return (pheromone * heuristic) / total
def update_pheromone(self, paths):
self.pheromone *= self.evaporation
for path, length in paths:
contribution = self.Q / length
for i in range(len(path)-1):
self.pheromone[path[i], path[i+1]] += contribution
self.pheromone[path[i+1], path[i]] += contribution
3.2 协同约束的特殊处理
针对无人机协同约束修改蚁群算法:
class CooperativeAntColony(AntColony):
def __init__(self, points, **kwargs):
super().__init__(points, **kwargs)
self.cooperative_pheromone = np.ones((len(points), len(points)))
def cooperative_update(self, path_A, path_B):
"""更新协同信息素矩阵"""
min_len = min(len(path_A), len(path_B))
for i in range(min_len - 1):
pos_A1 = self.points[path_A[i]]
pos_A2 = self.points[path_A[i+1]]
pos_B1 = self.points[path_B[i]]
pos_B2 = self.points[path_B[i+1]]
# 计算两机路径段的协同度
cooperation_score = self.calculate_cooperation(pos_A1, pos_A2, pos_B1, pos_B2)
self.cooperative_pheromone[path_A[i], path_A[i+1]] += cooperation_score
self.cooperative_pheromone[path_B[i], path_B[i+1]] += cooperation_score
def calculate_cooperation(self, A1, A2, B1, B2):
"""评估路径段协同程度"""
mid_A = (A1 + A2) / 2
mid_B = (B1 + B2) / 2
return 1 if is_line_intersecting_circle(mid_A, mid_B) else -1
def select_next(self, current, visited, other_path=None):
"""改进的选择策略考虑协同约束"""
unvisited = [i for i in range(len(self.points)) if i not in visited]
probabilities = self.calculate_coop_probabilities(current, unvisited, other_path)
return np.random.choice(unvisited, p=probabilities)
def calculate_coop_probabilities(self, current, unvisited, other_path=None):
base_prob = super().calculate_probabilities(current, unvisited)
if other_path is None:
return base_prob
# 结合协同信息素调整概率
coop_factor = self.cooperative_pheromone[current, unvisited]
coop_factor = (coop_factor - np.min(coop_factor)) / (np.max(coop_factor) - np.min(coop_factor) + 1e-6)
adjusted_prob = base_prob * (0.5 + 0.5 * coop_factor)
return adjusted_prob / np.sum(adjusted_prob)
3.3 参数调优实战
蚁群算法性能高度依赖参数设置,以下是调优经验:
| 参数 | 推荐范围 | 影响效果 | 调优建议 |
|---|---|---|---|
| α (alpha) | 0.5-2 | 信息素重要性 | 值越大,蚂蚁越倾向于跟随信息素轨迹 |
| β (beta) | 2-5 | 启发式信息重要性 | 值越大,蚂蚁越倾向于选择近距离节点 |
| 蒸发率 | 0.3-0.7 | 信息素挥发速度 | 值越小,历史信息保留越多 |
| Q常数 | 50-200 | 信息素沉积量 | 影响信息素更新幅度 |
| 蚂蚁数量 | 10-50 | 并行搜索能力 | 数量越多,搜索越全面,但计算成本越高 |
def parameter_tuning_experiment():
"""参数调优实验示例"""
param_grid = {
'alpha': [0.5, 1, 1.5],
'beta': [2, 3, 4],
'evaporation': [0.3, 0.5, 0.7],
'Q': [50, 100, 200],
'n_ants': [10, 20, 30]
}
best_params = None
best_score = float('inf')
# 简化的网格搜索(实际应使用更高效的方法)
for alpha in param_grid['alpha']:
for beta in param_grid['beta']:
colony = AntColony(space_points, alpha=alpha, beta=beta)
path, length = colony.run()
if length < best_score:
best_score = length
best_params = {'alpha': alpha, 'beta': beta}
print(f"最佳参数组合: {best_params}, 路径长度: {best_score:.2f}")
return best_params
4. 算法对比与结果分析
4.1 性能指标对比
我们设计了多维度的评估体系:
def evaluate_algorithm(algorithm, *args, **kwargs):
"""算法评估框架"""
start_time = time.time()
result = algorithm(*args, **kwargs)
computation_time = time.time() - start_time
if isinstance(result, tuple) and len(result) == 2:
path, length = result
else:
path = result
length = calculate_path_length(path)
smoothness = calculate_path_smoothness(path)
constraints_violation = check_constraints_violation(path)
return {
'path': path,
'length': length,
'computation_time': computation_time,
'smoothness': smoothness,
'constraints_violation': constraints_violation
}
def compare_algorithms():
"""算法对比实验"""
# 准备测试用例
test_cases = [
{'start': 0, 'end': -1}, # A到B的典型路径
{'start': 10, 'end': 50}, # 中等距离路径
{'start': 30, 'end': 80} # 复杂环境路径
]
# 算法列表
algorithms = [
('Dijkstra', dijkstra_wrapper),
('A*', astar_wrapper),
('Ant Colony', ant_colony_wrapper)
]
# 执行对比
results = {}
for name, func in algorithms:
case_results = []
for case in test_cases:
eval_result = evaluate_algorithm(func, **case)
case_results.append(eval_result)
results[name] = case_results
# 输出对比表格
print("\n算法性能对比:")
print("| 算法名称 | 平均路径长度 | 平均计算时间 | 平均平滑度 | 约束违反次数 |")
print("|----------|--------------|--------------|------------|--------------|")
for name, case_results in results.items():
avg_length = np.mean([r['length'] for r in case_results])
avg_time = np.mean([r['computation_time'] for r in case_results])
avg_smooth = np.mean([r['smoothness'] for r in case_results])
avg_violation = np.mean([r['constraints_violation'] for r in case_results])
print(f"| {name:8} | {avg_length:12.2f} | {avg_time:12.4f} | {avg_smooth:10.2f} | {avg_violation:12} |")
4.2 可视化分析
路径可视化能直观展示算法差异:
def plot_algorithm_comparison(results):
"""算法对比可视化"""
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
case_names = ['简单场景', '中等场景', '复杂场景']
for i, case_name in enumerate(case_names):
ax = axes[i]
# 绘制环境
obstacle = plt.Circle((0, 0), obstacle_radius, color='gray', alpha=0.3)
ax.add_patch(obstacle)
ax.plot(A_pos[0], A_pos[1], 'ro', markersize=10)
ax.plot(B_pos[0], B_pos[1], 'bo', markersize=10)
# 绘制各算法路径
for algo_name, algo_results in results.items():
path_indices = algo_results[i]['path']
path_points = space_points[path_indices]
ax.plot(path_points[:,0], path_points[:,1], '.-',
label=f'{algo_name} ({algo_results[i]["length"]:.1f}m)')
ax.set_title(case_name)
ax.set_aspect('equal')
ax.grid(True)
if i == 0:
ax.legend()
plt.suptitle('不同算法路径规划效果对比')
plt.tight_layout()
plt.show()
4.3 实际应用建议
根据实验结果,我们得出以下实用建议:
-
Dijkstra算法适用场景 :
- 当环境复杂度不高时(离散点少于500个)
- 需要保证找到最优解的场景
- 计算资源充足的情况下
-
蚁群算法优势场景 :
- 大规模复杂环境(离散点多于1000个)
- 需要满足多个复杂约束条件
- 可以接受近似最优解以换取更快速度
混合策略实践 :在实际项目中,可以先用Dijkstra算法在小规模问题上确定基准解,然后用蚁群算法处理大规模问题,并将Dijkstra结果作为初始信息素分布。
更多推荐
所有评论(0)