一、通讯基础参数(原厂标准)

硬件连接

  • 接口类型:9 针 RS232 串口
  • 接线方式:交叉接线(TXD3、RXD2、GND5)
  • 默认通讯配置
    • 波特率:9600 bps
    • 数据位:8 位
    • 停止位:1 位
    • 校验位:无
    • 流控:Xon/Xoff OFF

指令格式

  • 编码格式:ASCII 字符
  • 分隔符:逗号(,
  • 结束符\r\n(回车换行)
  • 工作模式
    • 本地模式:! 开头
    • 远程模式:@ 开头(电脑控制)

核心指令集

指令 功能描述 示例
SRM,01 切换远程控制模式(电脑接管面板) SRM,01\r\n
RMD 读取实时 A/F、λ、O₂、设备状态 RMD\r\n
SDS,00/01/02 切换测量项(O₂/λ/A/F) SDS,01\r\n
RCC 读取传感器全部标定系数 RCC\r\n
RCF 读取 H/C、O/C 燃料系数 RCF\r\n

二、前期硬件准备

1. 硬件清单

  • RS232 转 USB 模块(推荐 FTDI 芯片)
  • 9 针交叉串口线(DB9 公对公)
  • MEXA-730λ 主机(背面 RS232 接口)
  • 电脑(Windows/Linux/macOS)

2. 设备设置步骤

  1. 通道配置

    • 通道 001:波特率 9600
    • 通道 002:流控 OFF
  2. 燃料系数预设

    • 汽油:H/C = 1.85
    • 柴油:H/C = 1.90
  3. 连接检查

    • 确认交叉线连接正确
    • 设备上电并进入测量模式

三、完整 Python 采集代码(pyserial)

环境准备

pip install pyserial

核心采集代码

import serial
import time
import csv
from datetime import datetime

class Mexa730LambdaCollector:
    """MEXA-730λ 数据采集器"""
    
    def __init__(self, port='COM3', baudrate=9600):
        """
        初始化串口连接
        :param port: 串口号(Windows: COM3, Linux: /dev/ttyUSB0)
        :param baudrate: 波特率
        """
        self.ser = serial.Serial(
            port=port,
            baudrate=baudrate,
            bytesize=8,
            stopbits=1,
            parity='N',
            timeout=0.1,
            xonxoff=False,
            rtscts=False,
            dsrdtr=False
        )
        self.log_file = None
        self.csv_writer = None
        
    def send_command(self, cmd):
        """
        发送指令并返回应答
        :param cmd: 指令字符串(不带结束符)
        :return: 应答字符串
        """
        # 添加结束符并编码
        send_buf = (cmd + "\r\n").encode("ascii")
        self.ser.write(send_buf)
        
        # 读取应答(可调整超时时间)
        time.sleep(0.05)
        resp = self.ser.readline().decode("ascii", errors='ignore').strip()
        return resp
    
    def switch_to_remote(self):
        """切换到远程控制模式"""
        resp = self.send_command("SRM,01")
        print(f"远程模式切换应答: {resp}")
        time.sleep(0.2)
        return resp
    
    def switch_to_local(self):
        """切换回本地控制模式"""
        resp = self.send_command("SRM,00")
        print(f"本地模式切换应答: {resp}")
        return resp
    
    def read_realtime_data(self):
        """
        读取实时测量数据
        :return: 解析后的数据字典
        """
        resp = self.send_command("RMD")
        
        if not resp.startswith("RMD,"):
            return None
        
        data = resp.split(",")
        
        # 解析数据字段
        result = {
            'raw_response': resp,
            'remote_mode': data[1] if len(data) > 1 else '',
            'status_flag': data[2] if len(data) > 2 else '',
            'measurement_value': data[-1] if len(data) > 3 else '',
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
        }
        
        # 判断测量项
        mode = result['remote_mode']
        if mode in ["1", "2"]:
            result['measurement_type'] = 'λ'
            result['unit'] = ''
        elif mode == "0":
            result['measurement_type'] = 'O₂'
            result['unit'] = '%'
        else:
            result['measurement_type'] = 'A/F'
            result['unit'] = ''
            
        return result
    
    def read_calibration_coefficients(self):
        """读取传感器标定系数"""
        resp = self.send_command("RCC")
        if resp.startswith("RCC,"):
            coefficients = resp.split(",")[1:]  # 去掉指令头
            return {
                'KA': coefficients[0] if len(coefficients) > 0 else '',
                'KM': coefficients[1] if len(coefficients) > 1 else '',
                'IPN₂': coefficients[2] if len(coefficients) > 2 else '',
                'RH0': coefficients[3] if len(coefficients) > 3 else ''
            }
        return {}
    
    def setup_csv_logging(self, filename=None):
        """设置 CSV 日志记录"""
        if filename is None:
            filename = f"mexa730_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        
        self.log_file = open(filename, 'w', newline='', encoding='utf-8')
        self.csv_writer = csv.writer(self.log_file)
        # 写入表头
        self.csv_writer.writerow([
            'Timestamp', 'Measurement_Type', 'Value', 'Unit', 
            'Remote_Mode', 'Status_Flag', 'Raw_Response'
        ])
        print(f"数据日志已创建: {filename}")
    
    def log_data(self, data):
        """记录数据到 CSV"""
        if self.csv_writer and data:
            self.csv_writer.writerow([
                data['timestamp'],
                data['measurement_type'],
                data['measurement_value'],
                data['unit'],
                data['remote_mode'],
                data['status_flag'],
                data['raw_response']
            ])
            self.log_file.flush()
    
    def start_collection(self, interval=0.5, duration=None):
        """
        开始数据采集
        :param interval: 采集间隔(秒)
        :param duration: 采集时长(秒),None 表示持续采集
        """
        print("=== MEXA-730λ 数据采集开始 ===")
        
        # 切换到远程模式
        self.switch_to_remote()
        
        # 设置数据记录
        self.setup_csv_logging()
        
        start_time = time.time()
        sample_count = 0
        
        try:
            while True:
                # 检查采集时长
                if duration and (time.time() - start_time) > duration:
                    print(f"采集时长达到 {duration} 秒,停止采集")
                    break
                
                # 读取实时数据
                data = self.read_realtime_data()
                
                if data:
                    # 显示数据
                    print(f"[{data['timestamp']}] {data['measurement_type']}: {data['measurement_value']} {data['unit']}")
                    
                    # 记录到 CSV
                    self.log_data(data)
                    sample_count += 1
                
                # 等待下一个采集周期
                time.sleep(interval)
                
        except KeyboardInterrupt:
            print("\n用户中断采集...")
        finally:
            # 切换回本地模式
            self.switch_to_local()
            
            # 关闭文件
            if self.log_file:
                self.log_file.close()
                print(f"数据采集完成,共采集 {sample_count} 个样本")
                print(f"数据已保存到: {self.log_file.name}")
            
            # 关闭串口
            self.ser.close()
            print("串口连接已关闭")

def main():
    """主函数示例"""
    # 配置参数(根据实际情况修改)
    PORT = "COM3"  # Windows
    # PORT = "/dev/ttyUSB0"  # Linux
    # PORT = "/dev/tty.usbserial"  # macOS
    
    # 创建采集器实例
    collector = Mexa730LambdaCollector(port=PORT)
    
    # 可选:读取标定系数
    print("读取传感器标定系数...")
    coeffs = collector.read_calibration_coefficients()
    if coeffs:
        print(f"标定系数: {coeffs}")
    
    # 开始采集(持续采集,按 Ctrl+C 停止)
    collector.start_collection(interval=0.5)
    
    # 或者指定采集时长(例如采集60秒)
    # collector.start_collection(interval=0.5, duration=60)

if __name__ == "__main__":
    main()

快速测试脚本

# quick_test.py - 快速连接测试
import serial
import time

def quick_test():
    """快速连接测试"""
    try:
        ser = serial.Serial(
            port="COM3",
            baudrate=9600,
            bytesize=8,
            stopbits=1,
            parity='N',
            timeout=1
        )
        
        # 测试指令
        test_cmds = [
            "SRM,01",  # 切换到远程
            "RMD",     # 读取数据
            "RCC",     # 读取标定系数
            "SRM,00"   # 切换回本地
        ]
        
        for cmd in test_cmds:
            print(f"发送: {cmd}")
            ser.write((cmd + "\r\n").encode("ascii"))
            time.sleep(0.2)
            resp = ser.readline().decode("ascii", errors='ignore').strip()
            print(f"接收: {resp}")
            time.sleep(0.5)
            
        ser.close()
        print("测试完成!")
        
    except Exception as e:
        print(f"连接失败: {e}")

if __name__ == "__main__":
    quick_test()

四、关键指令说明

RMD 返回格式解析

RMD , 3 , 0 , 0 , 14.683
  • 字段1RMD - 指令标识
  • 字段23 - 远程测量模式
  • 字段30 - 状态标志(0=正常)
  • 字段40 - 备用字段
  • 字段514.683 - 测量值(λ值)

状态标志说明

  • 0:正常测量
  • 1:NOT READY
  • 2:ALARM(故障)
  • 3:CALIBRATION(校准中)

通道读写指令

设置 H/C 燃料系数

CH:022=1.85\r\n

查询通道值

?CH:022\r\n

响应格式

CH:022=1.85\r\n

五、常见通讯故障排查

1. 无应答

  • 可能原因
    • 串口线类型错误(应用交叉线而非直通线)
    • 波特率不匹配(确认设备设置为9600)
    • 串口号错误(检查设备管理器中的COM口)
  • 解决方案
    # 列出可用串口
    import serial.tools.list_ports
    ports = list(serial.tools.list_ports.comports())
    for port in ports:
        print(port.device, port.description)
    

2. 返回乱码

  • 可能原因
    • 校验位设置错误(应为无校验’N’)
    • 停止位错误(应为1)
    • 数据位错误(应为8)
  • 解决方案
    # 验证串口配置
    ser = serial.Serial(
        port="COM3",
        baudrate=9600,
        bytesize=8,      # 必须为8
        stopbits=1,      # 必须为1
        parity='N',      # 必须为'N'
        timeout=0.1
    )
    

3. 指令返回 ERROR

  • 可能原因
    • 设备处于本地模式(面板控制)
    • 指令格式错误(缺少结束符\r\n
    • 指令参数错误
  • 解决方案
    # 确保先切换到远程模式
    send_cmd(ser, "SRM,01")
    time.sleep(0.2)  # 等待设备响应
    

4. 数据不更新

  • 可能原因
    • 设备处于校准模式
    • 设备处于参数设置模式
    • 传感器未就绪
  • 解决方案
    1. 检查设备前面板显示状态
    2. 切换到测量模式(M模式)
    3. 发送SDS,01切换到λ测量

更多推荐