用Python和MNE库搞定SEED脑电数据集:从.mat文件到可训练数据的完整流程
·
用Python和MNE库解析SEED脑电数据集:工程化处理实战指南
当你第一次打开SEED数据集文件夹,看到密密麻麻的.mat文件时,是否感到无从下手?作为脑机接口领域最常用的公开数据集之一,SEED提供了丰富的情感识别脑电数据,但如何将这些数据高效转化为Python中可操作的对象,是每个研究者都会遇到的挑战。本文将带你从工程化角度,一步步拆解数据处理全流程。
1. 理解SEED数据集结构
SEED数据集由上海交通大学BCMI实验室发布,包含15名被试者在观看情感视频时的脑电记录。每个被试者的数据存储在独立的.mat文件中,文件命名遵循"姓名_日期.mat"的格式。
.mat文件内部结构解析:
- 每个文件包含15个试次(trial)的数据,对应15段不同情感刺激下的脑电记录
- 数据以字典形式存储,键名格式为"姓名_eegN"(N为1-15)
- 每个试次数据为62×T的numpy数组,62代表电极通道数,T为时间点数
- 采样率已降为200Hz,并经过0-75Hz带通滤波预处理
import scipy.io as sio
# 示例:查看单个.mat文件结构
data = sio.loadmat('dujingcheng_20131027.mat')
print(data.keys()) # 输出所有键名
print(data['djc_eeg1'].shape) # 查看第一个试次的数据维度
2. 构建高效数据加载管道
批量处理.mat文件时,需要考虑内存管理和处理效率。以下是优化后的加载方案:
2.1 单文件处理函数
import mne
import numpy as np
from pathlib import Path
# 标准电极名称列表(按SEED数据实际顺序排列)
CH_NAMES = [
'FP1', 'FPZ', 'FP2', 'AF3', 'AF4', 'F7', 'F5', 'F3', 'F1', 'FZ',
'F2', 'F4', 'F6', 'F8', 'FT7', 'FC5', 'FC3', 'FC1', 'FCZ', 'FC2',
'FC4', 'FC6', 'FT8', 'T7', 'C5', 'C3', 'C1', 'CZ', 'C2', 'C4',
'C6', 'T8', 'TP7', 'CP5', 'CP3', 'CP1', 'CPZ', 'CP2', 'CP4', 'CP6',
'TP8', 'P7', 'P5', 'P3', 'P1', 'PZ', 'P2', 'P4', 'P6', 'P8',
'PO7', 'PO5', 'PO3', 'POZ', 'PO4', 'PO6', 'PO8', 'CB1', 'O1', 'OZ',
'O2', 'CB2'
]
def load_single_mat(file_path, crop_start=5.0):
"""
加载单个.mat文件并转换为MNE Raw对象列表
参数:
file_path: .mat文件路径
crop_start: 裁剪起始时间(秒),去除初始不稳定信号
返回:
raw_list: 包含15个Raw对象的列表
"""
data = sio.loadmat(file_path)
raw_list = []
# 提取所有eeg数据键(排除MATLAB元数据键)
eeg_keys = [k for k in data.keys() if k.startswith('djc_')]
for key in eeg_keys:
# 创建MNE Info对象
info = mne.create_info(
ch_names=CH_NAMES,
sfreq=200, # SEED采样率为200Hz
ch_types='eeg'
)
# 转换为RawArray并裁剪前5秒
raw = mne.io.RawArray(data[key], info)
raw.crop(tmin=crop_start)
raw_list.append(raw)
return raw_list
2.2 批量处理与内存优化
处理大量数据时,建议使用生成器而非一次性加载所有数据:
def mat_file_generator(data_dir, max_files=None):
"""
生成器函数,逐个yield.mat文件数据
参数:
data_dir: 包含.mat文件的目录
max_files: 最大处理文件数(None表示处理所有)
返回:
每次yield一个(raw_list, subject_id)元组
"""
data_dir = Path(data_dir)
mat_files = list(data_dir.glob('*.mat'))
if max_files is not None:
mat_files = mat_files[:max_files]
for mat_file in mat_files:
subject_id = mat_file.stem
try:
raw_list = load_single_mat(mat_file)
yield raw_list, subject_id
except Exception as e:
print(f"Error processing {mat_file}: {str(e)}")
continue
3. 数据质量检查与可视化
在正式分析前,进行基本数据质量检查至关重要:
3.1 基础统计信息
def check_data_quality(raw):
"""输出数据基本统计信息"""
print(f"数据时长: {raw.times[-1]:.2f}秒")
print(f"采样点数: {len(raw.times)}")
print(f"通道数量: {len(raw.ch_names)}")
# 各通道标准差(反映信号强度)
ch_std = np.std(raw.get_data(), axis=1)
print(f"通道标准差范围: {ch_std.min():.2f} - {ch_std.max():.2f} μV")
# 全局均值
global_mean = np.mean(raw.get_data())
print(f"全局均值: {global_mean:.2f} μV")
3.2 快速可视化
MNE提供了丰富的可视化工具:
def quick_visualization(raw, n_channels=10):
"""绘制数据概览图"""
# 绘制原始数据(随机选择部分通道)
pick_ch = np.random.choice(raw.ch_names, size=min(n_channels, len(raw.ch_names)), replace=False)
raw.plot(title='Raw EEG Data', duration=10, n_channels=n_channels, scalings='auto')
# 绘制功率谱密度
raw.plot_psd(fmax=75) # SEED已滤波到0-75Hz
4. 工程化实践技巧
4.1 并行处理加速
使用Python的concurrent.futures加速批量处理:
from concurrent.futures import ThreadPoolExecutor
def parallel_process_mat(data_dir, max_workers=4):
"""并行处理.mat文件"""
mat_files = list(Path(data_dir).glob('*.mat'))
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for mat_file in mat_files:
futures.append(executor.submit(load_single_mat, mat_file))
for future in futures:
try:
results.extend(future.result())
except Exception as e:
print(f"处理失败: {str(e)}")
return results
4.2 数据缓存机制
使用joblib实现处理结果缓存,避免重复计算:
from joblib import Memory
# 设置缓存目录
memory = Memory('./cache_dir', verbose=0)
@memory.cache
def cached_load_mat(file_path):
"""带缓存的数据加载函数"""
return load_single_mat(file_path)
4.3 异常处理与日志记录
完善的异常处理能提高流程稳定性:
import logging
logging.basicConfig(filename='eeg_processing.log', level=logging.INFO)
def safe_load_mat(file_path):
"""带错误处理的数据加载"""
try:
start_time = time.time()
raw_list = load_single_mat(file_path)
duration = time.time() - start_time
logging.info(f"成功加载 {file_path}, 耗时 {duration:.2f}秒")
return raw_list
except Exception as e:
logging.error(f"加载 {file_path} 失败: {str(e)}")
return None
5. 数据标准化与特征提取
5.1 数据标准化方法
不同被试者间数据标准化策略:
| 方法 | 说明 | 适用场景 |
|---|---|---|
| Z-score | 各通道单独标准化 | 保留个体差异 |
| 全局标准化 | 所有通道统一标准化 | 强调通道间关系 |
| 分位数标准化 | 基于数据分布的分位数 | 处理异常值 |
from sklearn.preprocessing import StandardScaler
def standardize_raw(raw, method='zscore'):
"""标准化Raw对象数据"""
data = raw.get_data()
if method == 'zscore':
# 各通道单独标准化
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data.T).T
elif method == 'global':
# 全局标准化
scaled_data = (data - np.mean(data)) / np.std(data)
else:
raise ValueError(f"未知标准化方法: {method}")
# 创建新的Raw对象
new_raw = mne.io.RawArray(scaled_data, raw.info)
return new_raw
5.2 时频特征提取
使用MNE计算时频表征:
def compute_tfr(raw, freqs=np.logspace(*np.log10([1, 30]), num=20)):
"""计算时频表征"""
from mne.time_frequency import tfr_multitaper
picks = mne.pick_types(raw.info, eeg=True)
tfr = tfr_multitaper(
raw,
freqs=freqs,
n_cycles=freqs/2,
picks=picks,
time_bandwidth=4.0,
return_itc=False
)
return tfr
6. 构建完整处理流程
将上述组件组合成端到端处理流程:
def full_processing_pipeline(data_dir, output_dir, max_files=None):
"""完整数据处理流程"""
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True)
# 1. 批量加载数据
for raw_list, subject_id in mat_file_generator(data_dir, max_files):
processed_data = []
# 2. 处理每个试次
for i, raw in enumerate(raw_list):
# 数据质量检查
check_data_quality(raw)
# 标准化处理
standardized = standardize_raw(raw)
# 时频分析
tfr = compute_tfr(standardized)
# 保存处理结果
save_path = output_dir / f"{subject_id}_trial{i+1}.h5"
tfr.save(save_path, overwrite=True)
processed_data.append(tfr)
print(f"完成处理 {subject_id},共 {len(processed_data)} 个试次")
在实际项目中,这种模块化设计让每个处理步骤都可以单独测试和调整。比如发现某个被试者的数据质量较差时,可以针对性地调整标准化参数或增加额外的质量控制步骤。
更多推荐

所有评论(0)