基于 Python 的公交 JT/T808 车载仿真测试平台搭建
前言
公交调度系统标准链路为「车载终端 TCP 通信上报数据 → 接入网关解析处理 → Kafka 消息队列分发 → 调度业务服务消费计算」,车辆定位、车况、报警信息均依托 JT/T808 协议传输。
企业配套独立 TMP 管理系统,统一维护车载终端设备档案、场站 GPS 点位、全线路基础规划数据,所有车载终端上线接入网关前,必须经过 TMP 系统完成设备合法性校验,未备案终端会被网关直接拦截。
线下实车上路测试成本高,隧道信号丢失、批量车辆故障、大量终端并发等高压力、异常场景很难复现,人工回归效率低下。本文基于 Python 技术栈搭建轻量化车载仿真测试平台,完整模拟真实车载终端上报 JT/T808 协议数据,兼容 TMP 设备校验流程,支持批量虚拟车辆并发、各类线上异常场景注入,覆盖终端准入校验、实时监控、大站准点考核、超速预警、智能排班、调度指令下发全业务验证场景。
一、仿真平台整体分层架构

平台核心 Python 技术栈
1)网络通信:asyncio 异步协程 Socket,单机器稳定支撑 3000~10000 台虚拟车载终端 TCP 长连接;
2)协议封装:自研 JT/T808 编解码工具类,统一完成报文组包、转义、校验、字节序转换、拆粘包处理;
3)TMP 接口模块:封装 HTTP 请求类,调用 TMP 系统接口完成虚拟车辆设备备案、线路、场站数据拉取、接入权限校验;
4)线路轨迹引擎:从 TMP 同步线路站点、场站 GPS 坐标,自动生成双向直线、单向环线、双向环线、Y 型分支完整行驶轨迹;支持人车固定绑定、人车轮班两种运营模式模拟;
5)自动化框架:Pytest + Allure,参数化管理仿真场景,自动生成可视化测试报告;
6)异常注入模块:可控模拟 GPS 间断丢失、持续超速、长期离线、批量车辆故障、网络抖动等线上问题;
7)指令闭环校验:监听网关下发 JT/T808 下行指令,解码后自动回执应答报文,校验调度指令下发一致性、无丢包。
核心代码片段 1:JT/T808 基础校验码、报文转义工具类
class JT808Util:
# JT/T808 报文起始、结束标识
HEAD = 0x7E
# CRC16校验表,简化实现
CRC_TABLE = [0] * 256
@staticmethod
def calc_crc16(data: bytes) -> int:
"""计算JT/T808 CRC16校验码"""
crc = 0xFFFF
for b in data:
crc ^= b
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
@staticmethod
def escape_data(raw: bytes) -> bytes:
"""报文转义:0x7E -> 0x7D 0x02; 0x7D -> 0x7D 0x01"""
res = bytearray()
for b in raw:
if b == 0x7E:
res.extend([0x7D, 0x02])
elif b == 0x7D:
res.extend([0x7D, 0x01])
else:
res.append(b)
return bytes(res)
@staticmethod
def unescape_data(raw: bytes) -> bytes:
"""报文反转义"""
res = bytearray()
idx = 0
while idx < len(raw):
if raw[idx] == 0x7D:
idx += 1
if raw[idx] == 0x02:
res.append(0x7E)
elif raw[idx] == 0x01:
res.append(0x7D)
else:
res.append(raw[idx])
idx += 1
return bytes(res)
核心代码片段 2:TMP 系统 HTTP 接口封装(设备备案、权限校验)
import aiohttp
class TMPApiClient:
def __init__(self, base_url: str, token: str):
self.base_url = base_url
self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
async def create_virtual_device(self, sim_no: str, line_id: int, station_id: int) -> dict:
"""TMP新增虚拟车载终端备案"""
payload = {
"sim_no": sim_no,
"line_id": line_id,
"station_id": station_id,
"device_status": "normal"
}
async with aiohttp.ClientSession() as session:
async with session.post(f"{self.base_url}/device/add", json=payload, headers=self.headers) as resp:
return await resp.json()
async def check_device_auth(self, sim_no: str) -> bool:
"""校验设备是否备案、允许接入网关"""
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.base_url}/device/auth?simNo={sim_no}", headers=self.headers) as resp:
data = await resp.json()
return data.get("data", {}).get("auth_pass", False)
async def get_line_gps(self, line_id: int) -> list:
"""拉取线路全部站点GPS坐标,用于轨迹生成"""
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.base_url}/line/gps?lineId={line_id}", headers=self.headers) as resp:
res = await resp.json()
return res.get("data", [])
二、虚拟车辆仿真完整执行流程

核心代码片段 3:单台虚拟车辆异步 TCP 协程(上报定位 + 监听下行指令)
import asyncio
from JT808Util import JT808Util
class VirtualVehicle:
def __init__(self, sim_no: str, line_points: list, tmp_client: TMPApiClient, gateway_host: str, gateway_port: int):
self.sim_no = sim_no
self.line_points = line_points
self.tmp_client = tmp_client
self.gateway_host = gateway_host
self.gateway_port = gateway_port
self.reader: asyncio.StreamReader = None
self.writer: asyncio.StreamWriter = None
self.point_idx = 0
self.inject_error = False # 异常注入开关
async def connect_gateway(self):
"""建立TCP长连接,发送JT/T808注册包"""
self.reader, self.writer = await asyncio.open_connection(self.gateway_host, self.gateway_port)
reg_packet = self.build_register_packet()
self.writer.write(reg_packet)
await self.writer.drain()
def build_register_packet(self) -> bytes:
"""构造JT/T808 0x0100终端注册报文"""
msg_body = self.sim_no.encode("gbk")
crc = JT808Util.crc16(msg_body)
raw = msg_body + crc.to_bytes(2, byteorder="little")
escape_body = JT808Util.escape_data(raw)
full = bytes([JT808Util.HEAD]) + escape_body + bytes([JT808Util.HEAD])
return full
def build_gps_packet(self, lon: float, lat: float, speed: int) -> bytes:
"""构造0x0200位置上报报文"""
body = f"{lon},{lat},{speed}".encode("utf8")
crc = JT808Util.crc16(body)
raw = body + crc.to_bytes(2, byteorder="little")
escape_body = JT808Util.escape_data(raw)
return bytes([JT808Util.HEAD]) + escape_body + bytes([JT808Util.HEAD])
async def send_gps_loop(self):
"""循环上报GPS轨迹,支持异常注入"""
while True:
if self.inject_error:
# 异常:停止上报模拟GPS丢失
await asyncio.sleep(5)
continue
point = self.line_points[self.point_idx]
lon, lat = point["lon"], point["lat"]
speed = 40 if self.point_idx % 10 != 0 else 65 # 随机超速
gps_pkt = self.build_gps_packet(lon, lat, speed)
self.writer.write(gps_pkt)
await self.writer.drain()
self.point_idx = (self.point_idx + 1) % len(self.line_points)
await asyncio.sleep(3)
async def listen_down_command(self):
"""监听网关下行JT/T808调度指令,自动回执应答"""
while True:
data = await self.reader.read(1024)
if not data:
break
unescape = JT808Util.unescape_data(data.strip(bytes([0x7E])))
# 构造通用应答 0x0001
resp = self.build_ack_packet()
self.writer.write(resp)
await self.writer.drain()
async def run(self):
"""车辆协程主入口"""
await self.connect_gateway()
task1 = asyncio.create_task(self.send_gps_loop())
task2 = asyncio.create_task(self.listen_down_command())
await asyncio.gather(task1, task2)
核心代码片段 4:并发车辆协程管理器(批量创建虚拟车)
class VehiclePool:
def __init__(self, gateway_host: str, gateway_port: int, tmp_client: TMPApiClient):
self.gateway_host = gateway_host
self.gateway_port = gateway_port
self.tmp_client = tmp_client
self.vehicles = []
async def batch_create_vehicle(self, count: int, line_id: int):
"""批量创建N台虚拟车辆协程"""
line_points = await self.tmp_client.get_line_gps(line_id)
tasks = []
for i in range(count):
sim = f"138{i:08d}"
v = VirtualVehicle(sim, line_points, self.tmp_client, self.gateway_host, self.gateway_port)
self.vehicles.append(v)
tasks.append(asyncio.create_task(v.run()))
await asyncio.gather(*tasks)
def inject_global_error(self, enable: bool):
"""全局开启所有车辆异常注入(GPS丢失/超速)"""
for v in self.vehicles:
v.inject_error = enable
核心代码片段 5:Pytest 自动化测试用例示例
import pytest
from VehiclePool import VehiclePool
from TMPApiClient import TMPApiClient
# 全局测试配置
GATEWAY_IP = "127.0.0.1"
GATEWAY_PORT = 8088
TMP_URL = "http://127.0.0.1:9000/api"
TMP_TOKEN = "test-token-xxx"
@pytest.fixture(scope="module")
async def tmp_client():
client = TMPApiClient(TMP_URL, TMP_TOKEN)
yield client
@pytest.mark.allure.feature("终端准入校验")
@pytest.mark.asyncio
async def test_unauth_device_block(tmp_client):
"""测试未在TMP备案设备被网关拦截"""
pool = VehiclePool(GATEWAY_IP, GATEWAY_PORT, tmp_client)
# 传入未备案SIM卡
sim = "13899999999"
auth = await tmp_client.check_device_auth(sim)
assert auth is False, "未备案设备校验应返回不通过"
@pytest.mark.allure.feature("早高峰并发压测")
@pytest.mark.asyncio
async def test_morning_peak_5000_car(tmp_client):
"""模拟5000台车辆早高峰并发上报"""
pool = VehiclePool(GATEWAY_IP, GATEWAY_PORT, tmp_client)
# 批量创建5000台绑定1号线车辆
await pool.batch_create_vehicle(count=5000, line_id=1)
# 持续上报3分钟,消费Kafka校验准点、预警数据
await asyncio.sleep(180)
三、平台核心仿真业务能力(贴合公交数字化真实场景)
1. 联动 TMP 系统完成终端准入全流程校验
仿真平台对接 TMP 系统全套管理接口,覆盖车载终端接入前置校验全场景:
- 批量创建虚拟车载设备档案,录入 TMP 系统备案;
- 绑定车辆归属线路、场站 GPS 点位,同步线路规划基础数据;
- 负向场景校验:未录入 TMP 的设备、已注销终端、归属线路不匹配车辆,验证网关直接拦截注册报文,无法正常上报数据;
- 支持切换 TMP 内线路、场站信息,快速验证线路规划变更后车辆轨迹上报、准点计算逻辑是否同步生效。
2. 全类型公交线路轨迹仿真
所有行驶坐标、场站点位数据均从 TMP 系统同步,支持行业全部主流线路拓扑自动生成车辆行驶坐标:双向直线、单向直线、单向环线、双向环线、Y 字分支线路;
兼容两种人车运营模式:人车一对一固定绑定、人车轮班轮换,可模拟中途司机换车、车辆轮换运营场景,覆盖智能排班全量校验逻辑。
3. 万级终端并发压测,复现早晚高峰流量瓶颈
基于 asyncio 协程实现轻量并发,无多进程资源开销,单机可模拟上万台车辆同时 TCP 长连接上报定位数据;
可复现早高峰海量时序数据涌入场景,用于验证接入网关粘包拆包处理能力、Kafka 消息削峰能力、实时缓存读写性能、准点分片计算接口延迟问题;同时批量校验万级设备在 TMP 中的备案状态,验证网关批量准入处理性能。
4. 线上高频异常场景一键注入,解决线下难以复现痛点
- 隧道、高架路段 GPS 间歇性断连,停止上报定位报文,校验系统离线容错机制,过滤无效离线预警弹窗;
- 持续超速行驶数据上报,校验大屏超速预警触发逻辑、违规记录入库准确性;
- 批量车辆故障下线,自动停止终端上报,验证系统自动启用备用车辆、排班重排逻辑;
- 道路拥堵批量晚点,车辆到站时间持续偏移,校验大站准点晚点预警、动态增发区间车流程;
- 网络抖动、TCP 报文丢包、下行指令下发失败,验证 Kafka 消息持久化、失败重试机制。
5. 调度指令全闭环校验
调度平台下发临时调班、增发区间车、限速提醒等指令后,网关会封装为 JT/T808 下行指令推送至仿真终端;
Python 脚本自动解析指令内容、返回标准应答报文,同时消费 Kafka 业务数据校验:指令状态更新、车载排班同步、后续班次调整数据完全一致,覆盖批量下发指令消息可靠性专项测试。
四、落地测试价值与平台性能优化方案
1.替代线下实车上路测试,隧道断信号、批量车辆故障、万级并发、TMP 设备准入等低频 / 高危场景一键复现,版本迭代回归效率提升 80%;
2.系统迭代后自动执行全量终端仿真自动化用例,全覆盖 TMP 设备校验、实时监控、大站准点判断、超速预警、动态调度、智能排班核心模块;
3.Python 协程池动态扩缩虚拟车辆数量,从 TMP 拉取的线路、场站坐标本地缓存复用,减少重复接口调用与坐标计算,避免单机 CPU 满载;
4.仿真全量 TCP 报文、TMP 接口调用日志、Kafka 消费日志持久化入库,线上设备接入失败、数据失真、预警错乱类缺陷可完整回放复现,快速区分是 TMP 档案配置、网关解析、还是消息队列链路问题。
五、典型业务测试场景落地示例
1.模拟早高峰 5000 台已在 TMP 备案车辆并发上报协议数据,校验监控大屏无卡顿、大站准点分片计算无延迟;
2.创建一批未录入 TMP 系统的虚拟终端,验证网关直接拦截注册请求,无法进入调度业务链路;
3.混合环线 + Y 型分支线路批量车辆离线,校验离线统计报表、司机绩效报表数据统计准确;
4.批量下发区间车增发调度指令,校验 Kafka 消息无丢失、全部虚拟车载终端同步更新排班计划;
5.修改 TMP 系统内线路场站坐标,重新启动仿真车辆,校验轨迹上报、到站预估时长同步更新;
6.模拟隧道 GPS 间断丢失,校验系统容错逻辑,不会频繁弹出离线预警干扰调度员正常操作;
7.人车轮班模式下多司机轮换同一车辆,校验排班人车匹配、单车运营里程台账统计无错乱。
总结
本文基于 Python asyncio 协程、JT/T808 协议、TMP 第三方管理系统,搭建可支撑万级虚拟车辆并发的公交车载仿真测试平台,同时提供协议编解码、TMP 接口、虚拟车辆 TCP 协程、并发管理、自动化用例完整可运行代码片段。 平台解决线下实车测试成本高、异常高压场景难以复现的痛点,覆盖设备准入、轨迹上报、并发压测、故障注入、调度指令闭环全量业务验证,自动化测试结合 Allure 生成可视化报告,可直接集成项目 CI 流程,用于调度系统版本迭代回归、性能专项测试、异常逻辑校验。
更多推荐
所有评论(0)