电力工程师必看:手把手教你用Python解析COMTRADE文件(含ASCII与二进制格式实战)

在电力系统故障分析和继电保护领域,COMTRADE格式已成为波形数据交换的事实标准。作为电力工程师,我们经常需要处理来自故障录波装置、数字继电器或PMU设备的COMTRADE文件,但市面上现成的解析工具往往功能有限,无法满足深度分析需求。本文将带您从零开始,用Python构建一个功能完整的COMTRADE解析器,涵盖ASCII和二进制两种格式的实战解析技巧。

1. COMTRADE文件结构深度解析

COMTRADE记录由配置文件(.CFG)和数据文件(.DAT)两个核心文件组成,可能伴随可选的.HDR头文件和.INF信息文件。理解文件结构是正确解析的前提:

  • 配置文件(.CFG) :采用ASCII格式,定义了数据文件的解析规则。关键信息包括:

    厂站名称,设备ID,标准版本年号
    通道总数,模拟通道数(A),数字通道数(D)
    模拟通道配置(单位、转换系数、量程等)
    数字通道配置
    系统频率(Hz)
    采样率组数及各组采样点数
    起始/结束时间戳
    数据文件格式(ASCII/BINARY)
    
  • 数据文件(.DAT) :根据.CFG定义的格式存储实际采样值。二进制格式相比ASCII节省约60%存储空间,但解析复杂度更高。两种格式的采样值都需要通过转换公式计算实际物理量:

    # 模拟量转换公式
    physical_value = fCoefA * raw_value + fCoefB
    

典型问题:约75%的现场数据存在配置文件声明的量程(fMin/fMax)与实际数据不符的情况,需要动态统计极值。

2. 配置文件解析实战

我们首先用Python的configparser库处理.CFG文件。虽然该文件不是标准INI格式,但可以通过预处理转换为可解析结构:

def parse_cfg(file_path):
    with open(file_path, 'r') as f:
        lines = [line.strip() for line in f if line.strip()]
    
    # 提取基础信息
    station_info = lines[0].split(',')
    channel_counts = lines[1].split(',')
    
    # 解析模拟通道配置
    analog_channels = []
    a_count = int(channel_counts[1].replace('A',''))
    for line in lines[2:2+a_count]:
        parts = line.split(',')
        channel = {
            'id': parts[0].strip(),
            'name': parts[1].strip(),
            'unit': parts[4].strip(),
            'coef_a': float(parts[5]),
            'coef_b': float(parts[6]),
            'primary': float(parts[10]),
            'secondary': float(parts[11])
        }
        analog_channels.append(channel)
    
    # 继续解析数字通道、采样率等信息...
    return config_data

注意:实际解析时需要处理各种边缘情况,如注释行、空格分隔符、缺失字段等。建议先对原始文件进行规范化预处理。

3. ASCII格式数据文件解析

ASCII格式.DAT文件每行包含一个采样点的所有通道值,用逗号分隔。使用pandas可高效处理:

def parse_ascii_dat(cfg, dat_path):
    # 确定列名:序号、时间戳、模拟通道、数字通道
    cols = ['sample', 'timestamp'] + [f'A{i}' for i in range(1, cfg['analog_count']+1)]
    cols += [f'D{i}' for i in range(1, cfg['digital_count']+1)]
    
    # 读取数据
    df = pd.read_csv(dat_path, header=None, names=cols)
    
    # 转换模拟量实际值
    for i, ch in enumerate(cfg['analog_channels'], start=1):
        df[f'A{i}_actual'] = ch['coef_a'] * df[f'A{i}'] + ch['coef_b']
    
    return df

常见陷阱处理:

  • 时间戳单位可能是微秒或秒,需根据配置文件确认
  • 数字通道可能被打包为16位一组,需要按位解析
  • 缺失值可能用特定数字(如99999)表示

4. 二进制格式解析技巧

二进制格式使用固定字节长度存储数据,解析时需要struct模块处理字节流:

def parse_binary_dat(cfg, dat_path):
    analog_counts = cfg['analog_count']
    digital_counts = cfg['digital_count']
    digital_groups = (digital_counts + 15) // 16  # 每16位一组
    
    samples = []
    with open(dat_path, 'rb') as f:
        while True:
            # 读取采样序号(4字节)和时间戳(4字节)
            header = f.read(8)
            if not header: break
            
            sample = {
                'number': struct.unpack('>i', header[:4])[0],
                'timestamp': struct.unpack('>i', header[4:8])[0]
            }
            
            # 读取模拟量(每个2字节)
            analog_data = f.read(2 * analog_counts)
            analogs = struct.unpack(f'>{analog_counts}h', analog_data)
            for i, val in enumerate(analogs, start=1):
                sample[f'A{i}'] = val
            
            # 读取数字量(每16位2字节)
            digital_data = f.read(2 * digital_groups)
            digital_packed = struct.unpack(f'>{digital_groups}H', digital_data)
            bits = []
            for word in digital_packed:
                bits.extend([(word >> i) & 1 for i in range(16)])
            for i in range(digital_counts):
                sample[f'D{i+1}'] = bits[i]
            
            samples.append(sample)
    
    df = pd.DataFrame(samples)
    # 应用模拟量转换公式...
    return df

二进制解析关键点:

  • 使用 struct.unpack 处理字节序(>表示大端)
  • 模拟量通常用16位有符号整数存储
  • 数字量按位打包,需要拆解
  • 处理文件结尾时要检查读取字节数

5. 高级处理与可视化

获得DataFrame后,可以进行深度分析:

动态量程修正

# 自动计算实际量程
for i in range(1, analog_counts+1):
    actual_values = df[f'A{i}_actual']
    cfg['analog_channels'][i-1]['actual_min'] = actual_values.min()
    cfg['analog_channels'][i-1]['actual_max'] = actual_values.max()

波形可视化

import matplotlib.pyplot as plt

def plot_channels(df, channels, start=0, end=-1):
    plt.figure(figsize=(12, 6))
    for ch in channels:
        plt.plot(df['timestamp'][start:end], 
                df[f'{ch}_actual'][start:end],
                label=f'{ch} ({cfg["analog_channels"][int(ch[1:])-1]["unit"]})')
    plt.xlabel('Time (μs)')
    plt.ylabel('Value')
    plt.legend()
    plt.grid()

故障特征提取 示例:

# 检测过电流事件
threshold = 1.2 * nominal_current
fault_samples = df[df['A1_actual'] > threshold]
if not fault_samples.empty:
    print(f"检测到过电流事件,持续时间: {fault_samples['timestamp'].iloc[-1] - fault_samples['timestamp'].iloc[0]}μs")

6. 工程实践建议

在实际项目中处理COMTRADE文件时,有几个经验教训值得分享:

  1. 性能优化 :对于大型二进制文件(>1GB),建议:

    • 使用numpy.memmap直接映射文件
    • 并行处理不同采样段
    • 缓存解析结果
  2. 异常处理

    try:
        raw = struct.unpack('>h', data)[0]
        return raw if raw != -32768 else float('nan')  # 处理缺失值
    except struct.error:
        logging.warning("数据截断,可能文件损坏")
        return float('nan')
    
  3. 元数据保存 :将解析后的配置信息和统计指标保存为JSON,便于后续分析:

    {
      "station": "Substation_01",
      "sampling_rates": [
        {"rate": 4000, "samples": 10240},
        {"rate": 10, "samples": 600}
      ],
      "channels": [
        {
          "name": "IA",
          "unit": "A",
          "nominal_range": [-100, 100],
          "actual_range": [-87.3, 92.1]
        }
      ]
    }
    

更多推荐