用Python解析北斗/GPS模块的NMEA 0183数据:一个Arduino/树莓派物联网项目的完整代码示例
·
用Python解析北斗/GPS模块的NMEA 0183数据:从串口读取到物联网应用实战
当你第一次连接北斗/GPS模块时,串口终端里不断刷新的 $GPGGA 、 $GPRMC 等神秘字符串可能会让人望而生畏。这些遵循NMEA 0183协议的数据流,实际上是位置信息的宝库。本文将带你用Python构建一个完整的解析系统,从硬件连接到数据可视化,最终实现一个可落地的位置追踪应用。
1. 硬件准备与环境搭建
在开始编码前,我们需要确保硬件正确连接。常见的ATGM336H、NEO-6M等模块通常通过UART与开发板通信。以树莓派为例:
import serial
ser = serial.Serial('/dev/ttyS0', 9600, timeout=1) # 根据实际设备修改端口
关键参数说明:
- 波特率:常见的有9600、115200等,需与模块规格匹配
- 校验位:通常为None(无校验)
- 超时设置:避免读取阻塞
注意:Windows系统通常使用COM端口(如COM3),Linux/macOS则为/dev/tty*形式
连接测试代码片段:
try:
while True:
line = ser.readline().decode('ascii', errors='ignore')
if line.startswith('$'):
print(line.strip())
except KeyboardInterrupt:
ser.close()
2. NMEA语句解析核心算法
2.1 校验和验证
每个NMEA语句都包含 *hh 形式的校验和,这是数据完整性的第一道防线:
def verify_checksum(nmea_sentence):
try:
# 提取校验部分
check_part = nmea_sentence.split('*')[1][:2]
# 计算校验值
data = nmea_sentence.split('$')[1].split('*')[0]
calculated_check = 0
for char in data:
calculated_check ^= ord(char)
return f"{calculated_check:02X}" == check_part.upper()
except:
return False
2.2 关键语句解析器实现
以最常用的GPGGA语句为例,我们构建结构化解析器:
import re
def parse_gpgga(gpgga_str):
if not verify_checksum(gpgga_str):
return None
pattern = r'\$GPGGA,(\d+\.\d+),(\d+\.\d+),([NS]),(\d+\.\d+),([EW]),(\d),(\d+),([\d.]+),([\d.-]+),M,([\d.-]+),M,([\d.]*),(\d*)\*[0-9A-Fa-f]{2}'
match = re.fullmatch(pattern, gpgga_str)
if not match:
return None
return {
'time': match.group(1),
'latitude': convert_to_decimal(match.group(2), match.group(3)),
'longitude': convert_to_decimal(match.group(4), match.group(5)),
'fix_quality': int(match.group(6)),
'satellites': int(match.group(7)),
'hdop': float(match.group(8)),
'altitude': float(match.group(9)),
'geoid_height': float(match.group(11))
}
def convert_to_decimal(coord, direction):
# 将度分格式转换为十进制
degrees = float(coord[:2]) if direction in ['N', 'S'] else float(coord[:3])
minutes = float(coord[2 if direction in ['N', 'S'] else 3:])
decimal = degrees + minutes/60
return -decimal if direction in ['S', 'W'] else decimal
语句类型识别表:
| 前缀 | 说明 | 典型语句示例 |
|---|---|---|
| GP | GPS定位数据 | GPGGA, GPRMC |
| BD | 北斗系统数据 | BDGGA, BDRMC |
| GN | 多系统联合定位数据 | GNGGA, GNRMC |
3. 数据存储与可视化系统
3.1 数据库设计
使用SQLite实现轻量级存储:
import sqlite3
from datetime import datetime
def init_db():
conn = sqlite3.connect('gps_data.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS positions
(id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME,
latitude REAL,
longitude REAL,
altitude REAL,
speed REAL,
satellites INTEGER,
hdop REAL)''')
conn.commit()
conn.close()
3.2 实时可视化
结合Matplotlib实现动态轨迹显示:
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
fig, ax = plt.subplots()
line, = ax.plot([], [], 'b-')
points = ax.scatter([], [], c='r', s=50)
def init():
ax.set_xlim(-180, 180)
ax.set_ylim(-90, 90)
return line,
def update(frame):
# 从数据库获取最新100个点
conn = sqlite3.connect('gps_data.db')
df = pd.read_sql('SELECT longitude, latitude FROM positions ORDER BY timestamp DESC LIMIT 100', conn)
conn.close()
if not df.empty:
line.set_data(df['longitude'], df['latitude'])
points.set_offsets(np.column_stack((df['longitude'], df['latitude'])))
return line, points
ani = FuncAnimation(fig, update, init_func=init, blit=True)
plt.show()
4. 物联网应用实战案例
4.1 车辆轨迹记录系统
完整数据处理流程实现:
class GPSTracker:
def __init__(self, port):
self.ser = serial.Serial(port, 9600)
self.parser = {
'GPGGA': parse_gpgga,
'GPRMC': parse_gprmc
}
init_db()
def run(self):
try:
while True:
line = self.ser.readline().decode('ascii', errors='ignore').strip()
if not line.startswith('$'):
continue
sentence_type = line[1:6]
if sentence_type in self.parser:
data = self.parser[sentence_type](line)
if data:
self.store_data(data)
except KeyboardInterrupt:
self.ser.close()
def store_data(self, data):
conn = sqlite3.connect('gps_data.db')
c = conn.cursor()
c.execute('''INSERT INTO positions
(timestamp, latitude, longitude, altitude, speed, satellites, hdop)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(datetime.now(), data.get('latitude'), data.get('longitude'),
data.get('altitude'), data.get('speed'),
data.get('satellites'), data.get('hdop')))
conn.commit()
conn.close()
4.2 性能优化技巧
多线程处理架构:
from threading import Thread
from queue import Queue
class SerialReader(Thread):
def __init__(self, port, queue):
super().__init__()
self.ser = serial.Serial(port, 9600)
self.queue = queue
def run(self):
while True:
line = self.ser.readline().decode('ascii', errors='ignore').strip()
if line.startswith('$'):
self.queue.put(line)
class DataProcessor(Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
line = self.queue.get()
# 解析处理逻辑...
常用NMEA语句优先级处理表:
| 语句类型 | 更新频率 | 关键数据 | 建议优先级 |
|---|---|---|---|
| GGA | 1Hz | 位置、海拔、卫星数 | 高 |
| RMC | 1Hz | 位置、速度、日期/时间 | 高 |
| GSV | 视情况 | 卫星视图信息 | 低 |
| GSA | 1Hz | DOP值和活动卫星 | 中 |
5. 异常处理与调试技巧
5.1 常见问题排查
串口通信问题检查清单:
- 确认端口号是否正确
- 检查波特率设置是否匹配
- 验证TX/RX线是否接反
- 检查地线连接
- 确认模块供电稳定
5.2 数据校验增强
对于关键应用,建议增加以下校验:
def enhanced_parser(nmea_str):
# 基础校验
if not verify_checksum(nmea_str):
return None
# 字段完整性检查
fields = nmea_str.split(',')
if len(fields) < 5: # 根据语句类型调整
return None
# 数据合理性验证
try:
lat = float(fields[2][:2]) + float(fields[2][2:])/60
if not -90 <= lat <= 90:
return None
except:
return None
# ...其他验证逻辑
在树莓派上部署时,可以使用这个简单的服务脚本:
#!/bin/bash
python3 /path/to/gps_service.py &
python3 /path/to/web_interface.py
实际项目中,我发现模块首次定位可能需要几分钟(冷启动),添加LED状态指示会显著提升用户体验。对于移动应用,增加简单的卡尔曼滤波可以平滑轨迹跳动。
更多推荐



所有评论(0)