Horiba MEXA-730λ RS232 通讯协议解析 + Python 实时采集代码
·
一、通讯基础参数(原厂标准)
硬件连接
- 接口类型:9 针 RS232 串口
- 接线方式:交叉接线(TXD3、RXD2、GND5)
- 默认通讯配置:
- 波特率:9600 bps
- 数据位:8 位
- 停止位:1 位
- 校验位:无
- 流控:Xon/Xoff OFF
指令格式
- 编码格式:ASCII 字符
- 分隔符:逗号(
,) - 结束符:
\r\n(回车换行) - 工作模式:
- 本地模式:
!开头 - 远程模式:
@开头(电脑控制)
- 本地模式:
核心指令集
| 指令 | 功能描述 | 示例 |
|---|---|---|
SRM,01 |
切换远程控制模式(电脑接管面板) | SRM,01\r\n |
RMD |
读取实时 A/F、λ、O₂、设备状态 | RMD\r\n |
SDS,00/01/02 |
切换测量项(O₂/λ/A/F) | SDS,01\r\n |
RCC |
读取传感器全部标定系数 | RCC\r\n |
RCF |
读取 H/C、O/C 燃料系数 | RCF\r\n |
二、前期硬件准备
1. 硬件清单
- RS232 转 USB 模块(推荐 FTDI 芯片)
- 9 针交叉串口线(DB9 公对公)
- MEXA-730λ 主机(背面 RS232 接口)
- 电脑(Windows/Linux/macOS)
2. 设备设置步骤
-
通道配置:
- 通道 001:波特率 9600
- 通道 002:流控 OFF
-
燃料系数预设:
- 汽油:H/C = 1.85
- 柴油:H/C = 1.90
-
连接检查:
- 确认交叉线连接正确
- 设备上电并进入测量模式
三、完整 Python 采集代码(pyserial)
环境准备
pip install pyserial
核心采集代码
import serial
import time
import csv
from datetime import datetime
class Mexa730LambdaCollector:
"""MEXA-730λ 数据采集器"""
def __init__(self, port='COM3', baudrate=9600):
"""
初始化串口连接
:param port: 串口号(Windows: COM3, Linux: /dev/ttyUSB0)
:param baudrate: 波特率
"""
self.ser = serial.Serial(
port=port,
baudrate=baudrate,
bytesize=8,
stopbits=1,
parity='N',
timeout=0.1,
xonxoff=False,
rtscts=False,
dsrdtr=False
)
self.log_file = None
self.csv_writer = None
def send_command(self, cmd):
"""
发送指令并返回应答
:param cmd: 指令字符串(不带结束符)
:return: 应答字符串
"""
# 添加结束符并编码
send_buf = (cmd + "\r\n").encode("ascii")
self.ser.write(send_buf)
# 读取应答(可调整超时时间)
time.sleep(0.05)
resp = self.ser.readline().decode("ascii", errors='ignore').strip()
return resp
def switch_to_remote(self):
"""切换到远程控制模式"""
resp = self.send_command("SRM,01")
print(f"远程模式切换应答: {resp}")
time.sleep(0.2)
return resp
def switch_to_local(self):
"""切换回本地控制模式"""
resp = self.send_command("SRM,00")
print(f"本地模式切换应答: {resp}")
return resp
def read_realtime_data(self):
"""
读取实时测量数据
:return: 解析后的数据字典
"""
resp = self.send_command("RMD")
if not resp.startswith("RMD,"):
return None
data = resp.split(",")
# 解析数据字段
result = {
'raw_response': resp,
'remote_mode': data[1] if len(data) > 1 else '',
'status_flag': data[2] if len(data) > 2 else '',
'measurement_value': data[-1] if len(data) > 3 else '',
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
}
# 判断测量项
mode = result['remote_mode']
if mode in ["1", "2"]:
result['measurement_type'] = 'λ'
result['unit'] = ''
elif mode == "0":
result['measurement_type'] = 'O₂'
result['unit'] = '%'
else:
result['measurement_type'] = 'A/F'
result['unit'] = ''
return result
def read_calibration_coefficients(self):
"""读取传感器标定系数"""
resp = self.send_command("RCC")
if resp.startswith("RCC,"):
coefficients = resp.split(",")[1:] # 去掉指令头
return {
'KA': coefficients[0] if len(coefficients) > 0 else '',
'KM': coefficients[1] if len(coefficients) > 1 else '',
'IPN₂': coefficients[2] if len(coefficients) > 2 else '',
'RH0': coefficients[3] if len(coefficients) > 3 else ''
}
return {}
def setup_csv_logging(self, filename=None):
"""设置 CSV 日志记录"""
if filename is None:
filename = f"mexa730_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
self.log_file = open(filename, 'w', newline='', encoding='utf-8')
self.csv_writer = csv.writer(self.log_file)
# 写入表头
self.csv_writer.writerow([
'Timestamp', 'Measurement_Type', 'Value', 'Unit',
'Remote_Mode', 'Status_Flag', 'Raw_Response'
])
print(f"数据日志已创建: {filename}")
def log_data(self, data):
"""记录数据到 CSV"""
if self.csv_writer and data:
self.csv_writer.writerow([
data['timestamp'],
data['measurement_type'],
data['measurement_value'],
data['unit'],
data['remote_mode'],
data['status_flag'],
data['raw_response']
])
self.log_file.flush()
def start_collection(self, interval=0.5, duration=None):
"""
开始数据采集
:param interval: 采集间隔(秒)
:param duration: 采集时长(秒),None 表示持续采集
"""
print("=== MEXA-730λ 数据采集开始 ===")
# 切换到远程模式
self.switch_to_remote()
# 设置数据记录
self.setup_csv_logging()
start_time = time.time()
sample_count = 0
try:
while True:
# 检查采集时长
if duration and (time.time() - start_time) > duration:
print(f"采集时长达到 {duration} 秒,停止采集")
break
# 读取实时数据
data = self.read_realtime_data()
if data:
# 显示数据
print(f"[{data['timestamp']}] {data['measurement_type']}: {data['measurement_value']} {data['unit']}")
# 记录到 CSV
self.log_data(data)
sample_count += 1
# 等待下一个采集周期
time.sleep(interval)
except KeyboardInterrupt:
print("\n用户中断采集...")
finally:
# 切换回本地模式
self.switch_to_local()
# 关闭文件
if self.log_file:
self.log_file.close()
print(f"数据采集完成,共采集 {sample_count} 个样本")
print(f"数据已保存到: {self.log_file.name}")
# 关闭串口
self.ser.close()
print("串口连接已关闭")
def main():
"""主函数示例"""
# 配置参数(根据实际情况修改)
PORT = "COM3" # Windows
# PORT = "/dev/ttyUSB0" # Linux
# PORT = "/dev/tty.usbserial" # macOS
# 创建采集器实例
collector = Mexa730LambdaCollector(port=PORT)
# 可选:读取标定系数
print("读取传感器标定系数...")
coeffs = collector.read_calibration_coefficients()
if coeffs:
print(f"标定系数: {coeffs}")
# 开始采集(持续采集,按 Ctrl+C 停止)
collector.start_collection(interval=0.5)
# 或者指定采集时长(例如采集60秒)
# collector.start_collection(interval=0.5, duration=60)
if __name__ == "__main__":
main()
快速测试脚本
# quick_test.py - 快速连接测试
import serial
import time
def quick_test():
"""快速连接测试"""
try:
ser = serial.Serial(
port="COM3",
baudrate=9600,
bytesize=8,
stopbits=1,
parity='N',
timeout=1
)
# 测试指令
test_cmds = [
"SRM,01", # 切换到远程
"RMD", # 读取数据
"RCC", # 读取标定系数
"SRM,00" # 切换回本地
]
for cmd in test_cmds:
print(f"发送: {cmd}")
ser.write((cmd + "\r\n").encode("ascii"))
time.sleep(0.2)
resp = ser.readline().decode("ascii", errors='ignore').strip()
print(f"接收: {resp}")
time.sleep(0.5)
ser.close()
print("测试完成!")
except Exception as e:
print(f"连接失败: {e}")
if __name__ == "__main__":
quick_test()
四、关键指令说明
RMD 返回格式解析
RMD , 3 , 0 , 0 , 14.683
- 字段1:
RMD- 指令标识 - 字段2:
3- 远程测量模式 - 字段3:
0- 状态标志(0=正常) - 字段4:
0- 备用字段 - 字段5:
14.683- 测量值(λ值)
状态标志说明
0:正常测量1:NOT READY2:ALARM(故障)3:CALIBRATION(校准中)
通道读写指令
设置 H/C 燃料系数:
CH:022=1.85\r\n
查询通道值:
?CH:022\r\n
响应格式:
CH:022=1.85\r\n
五、常见通讯故障排查
1. 无应答
- 可能原因:
- 串口线类型错误(应用交叉线而非直通线)
- 波特率不匹配(确认设备设置为9600)
- 串口号错误(检查设备管理器中的COM口)
- 解决方案:
# 列出可用串口 import serial.tools.list_ports ports = list(serial.tools.list_ports.comports()) for port in ports: print(port.device, port.description)
2. 返回乱码
- 可能原因:
- 校验位设置错误(应为无校验’N’)
- 停止位错误(应为1)
- 数据位错误(应为8)
- 解决方案:
# 验证串口配置 ser = serial.Serial( port="COM3", baudrate=9600, bytesize=8, # 必须为8 stopbits=1, # 必须为1 parity='N', # 必须为'N' timeout=0.1 )
3. 指令返回 ERROR
- 可能原因:
- 设备处于本地模式(面板控制)
- 指令格式错误(缺少结束符
\r\n) - 指令参数错误
- 解决方案:
# 确保先切换到远程模式 send_cmd(ser, "SRM,01") time.sleep(0.2) # 等待设备响应
4. 数据不更新
- 可能原因:
- 设备处于校准模式
- 设备处于参数设置模式
- 传感器未就绪
- 解决方案:
- 检查设备前面板显示状态
- 切换到测量模式(M模式)
- 发送
SDS,01切换到λ测量
更多推荐
所有评论(0)