用Python+NetworkX复现经典交通分配:手把手教你从零搭建Frank-Wolfe算法求解UE模型
用Python+NetworkX复现经典交通分配:手把手教你从零搭建Frank-Wolfe算法求解UE模型
交通网络分析是城市规划与智能交通系统的核心课题。当我们在导航软件中看到实时路况预测,或参与城市道路规划方案评估时,背后都离不开交通分配模型的支持。用户均衡(User Equilibrium)作为Wardrop第一原理的数学表达,描述了出行者在路径选择中追求个人最优的群体行为模式。本文将带您用Python的NetworkX库,从零实现Frank-Wolfe算法求解UE模型的全过程。
1. 理论基础与工具准备
1.1 Wardrop均衡原理的工程意义
1952年提出的Wardrop第一原理指出:在均衡状态下,所有被使用的路径具有相同且最小的行驶时间。这一原理看似简单,却揭示了交通流自组织行为的本质特征。例如:
- 早晚高峰时,驾驶员不断切换路线导致各替代路径时间趋于一致
- 导航App的实时推荐实际上在推动系统向均衡状态演化
- 新开通道路初期可能无法分流,直到部分驾驶员发现时间优势
1.2 Beckmann变换与BPR函数
Beckmann在1956年将Wardrop原理转化为数学规划问题:
minimize Z(x) = ∑∫₀ˣᵃ tₐ(w)dw
subject to:
∑ₖ fₖʳˢ = qʳˢ ∀(r,s)
fₖʳˢ ≥ 0
其中路阻函数常采用BPR(Bureau of Public Roads)公式:
def BPR(FFT, flow, capacity, alpha=0.15, beta=4.0):
return FFT * (1 + alpha * (flow / capacity) ** beta)
参数典型值:
| 参数 | 物理意义 | 典型值 |
|---|---|---|
| FFT | 自由流时间 | 路段属性 |
| α | 拥堵敏感度 | 0.15 |
| β | 非线性程度 | 4.0 |
1.3 开发环境配置
推荐使用Python 3.8+环境,主要依赖库:
pip install networkx pandas numpy matplotlib scipy
数据文件结构示例:
/SiouxFalls
├── Link.csv # 路段属性
├── Node.csv # 节点坐标
└── ODPairs.csv # OD需求矩阵
2. 网络构建与数据预处理
2.1 拓扑结构可视化
利用NetworkX构建有向图并可视化:
def build_network(link_path, node_path):
G = nx.from_pandas_edgelist(
pd.read_csv(link_path),
source='O', target='D',
edge_attr=['FFT', 'Capacity'],
create_using=nx.DiGraph()
)
# 设置节点位置属性
pos = {row['id']:(row['pos_x'], row['pos_y'])
for _,row in pd.read_csv(node_path).iterrows()}
nx.set_node_attributes(G, pos, 'pos')
return G
常见问题处理:
- 节点ID不连续时需重建索引
- 缺失值建议用相邻路段均值填充
- 通行能力为0的路段应移除
2.2 流量初始化技巧
全有全无分配法(All-or-Nothing)实现要点:
def all_none_initialize(G, od_df):
for _, (o, d, demand) in od_df.iterrows():
path = nx.shortest_path(G, o, d, weight='FFT') # 零流最短路
for u, v in zip(path[:-1], path[1:]):
G[u][v]['flow_real'] += demand # 流量累积
# 更新初始阻抗
for u, v, data in G.edges(data=True):
data['weight'] = BPR(data['FFT'], data['flow_real'], data['Capacity'])
注意:NetworkX的shortest_path默认使用Dijkstra算法,对于大型网络可考虑A*算法加速
3. Frank-Wolfe算法实现细节
3.1 算法流程分解
Frank-Wolfe算法迭代过程可分为四个关键步骤:
- 辅助流量分配 :在当前阻抗下执行全有全无分配
- 方向确定 :计算辅助流量与当前流量的差值向量
- 步长搜索 :沿下降方向进行一维线性搜索
- 流量更新 :合并当前流量与辅助流量
3.2 核心代码实现
辅助流量生成 :
def all_none_temp(G, od_df):
nx.set_edge_attributes(G, 0, 'flow_temp') # 重置临时流量
for o, d, demand in od_df.values:
path = nx.shortest_path(G, o, d, weight='weight')
for u, v in zip(path[:-1], path[1:]):
G[u][v]['flow_temp'] += demand
最优步长搜索 (使用SciPy优化器):
def get_best_step(G):
res = minimize_scalar(
lambda a: sum(
data['FFT'] * (x := data['flow_real'] + a*data['descent']) *
(1 + 0.15/(4+1)*(x/data['Capacity'])**4)
for *_, data in G.edges(data=True)
),
bounds=(0, 1),
method='bounded'
)
return res.x
3.3 收敛性优化技巧
实践中我们采用相对误差作为停止准则:
def compute_error(G):
flows = np.array(list(nx.get_edge_attributes(G, 'flow_real').values()))
new_flows = np.array(list(nx.get_edge_attributes(G, 'flow_temp').values()))
return np.linalg.norm(new_flows - flows) / (np.sum(flows) + 1e-6)
常见收敛问题解决方案:
- 震荡现象:引入Nesterov动量加速
- 早熟收敛:自适应调整步长下限
- 数值不稳定:对BPR函数进行平滑处理
4. 完整实现与结果分析
4.1 主程序架构
def main(max_iter=100, tol=1e-4):
G = build_network("Link.csv", "Node.csv")
od_df = pd.read_csv("ODPairs.csv")
# 初始化
all_none_initialize(G, od_df)
errors = []
# 迭代过程
for epoch in range(max_iter):
all_none_temp(G, od_df)
get_descent(G)
update_flow_real(G)
err = compute_error(G)
errors.append(err)
if err < tol:
break
# 结果输出
plot_convergence(errors)
save_results(G)
4.2 结果可视化方法
收敛曲线绘制:
def plot_convergence(errors):
plt.figure(figsize=(10, 5))
plt.semilogy(errors, marker='o', linestyle='--')
plt.xlabel('Iteration')
plt.ylabel('Relative Gap')
plt.grid(True)
流量热力图生成:
def plot_heatmap(G):
pos = nx.get_node_attributes(G, 'pos')
flows = [d['flow_real'] for *_, d in G.edges(data=True)]
nx.draw_networkx_edges(
G, pos,
width=[f/max(flows)*5 for f in flows],
edge_color=flows,
edge_cmap=plt.cm.Reds
)
4.3 性能优化建议
对于大规模网络(>1000节点)可考虑:
-
并行计算 :将OD对分配到不同进程处理
from multiprocessing import Pool def parallel_all_none(args): G, od_chunk = args local_G = G.copy() all_none_temp(local_G, od_chunk) return nx.get_edge_attributes(local_G, 'flow_temp') -
稀疏矩阵存储 :使用CSR格式存储大规模OD矩阵
-
增量更新 :仅对受影响子图重新计算最短路
5. 工程实践中的扩展应用
5.1 多类用户均衡
考虑不同车辆类型(如货车/客车)的阻抗差异:
class MultiClassUE:
def __init__(self, vehicle_classes):
self.classes = vehicle_classes # 各车型占比及BPR参数
def combined_BPR(self, flow):
return sum(
p * BPR(FFT, flow, cap, a, b)
for p, (FFT, cap, a, b) in zip(
self.classes['proportion'],
self.classes['params']
)
)
5.2 动态交通分配
引入时间维度后的实现要点:
- 将路网扩展为时空网络
- 阻抗函数加入排队延迟项
- 使用动态规划求解最优出发时间
5.3 与微观仿真结合
宏观均衡结果可作为微观仿真的输入:
def generate_od_matrix(G, interval=15):
return {
(o, d): G.nodes[o]['population'] * G.nodes[d]['attraction']
for o, d in product(G.nodes, repeat=2)
}
实际项目中,我们常先用UE模型进行快速方案评估,再通过微观仿真验证细节。这种组合方法在深圳某交通枢纽改造中,将方案评估周期从2周缩短到3天。
更多推荐
所有评论(0)