科研实战:用Matlab和Python实现相位传递熵的完整指南

在神经科学和复杂系统研究中,时序数据分析一直是核心挑战之一。面对脑电信号、神经元放电序列这类复杂数据,传统统计方法往往难以捕捉其动态交互特性。相位传递熵(Phase Transfer Entropy, PTE)作为一种基于信息论的工具,能够有效量化不同脑区或神经单元间的信息流动方向与强度,为研究者提供了一种全新的分析视角。

1. 理解熵与传递熵的基础概念

1.1 从香农熵到信息传递

熵的概念最早由克劳德·香农引入信息论,用于量化系统的不确定性。对于一个离散随机变量X,其香农熵定义为:

import numpy as np

def shannon_entropy(prob_dist):
    """计算离散概率分布的香农熵"""
    return -np.sum(prob_dist * np.log2(prob_dist))

注意:输入prob_dist应为概率分布数组,所有元素之和为1

在神经信号分析中,我们更关注的是不同信号间的信息流动。传递熵(Transfer Entropy, TE)扩展了这一概念,用于测量从源信号到目标信号的信息传递量。其核心思想是:如果知道源信号的过去能减少预测目标信号未来的不确定性,则认为存在信息传递。

1.2 关键概念对比

概念 数学表达 神经科学意义
香农熵 H(X) = -Σp(x)logp(x) 单个神经信号的不确定性
联合熵 H(X,Y) = -Σp(x,y)logp(x,y) 两个信号联合状态的不确定性
条件熵 H(Y X) = H(X,Y) - H(X)
传递熵 TE_{X→Y} = H(Y_f Y_p) - H(Y_f

2. 相位传递熵的原理与优势

2.1 从时域到相位空间

传统传递熵直接作用于原始信号,而相位传递熵先通过希尔伯特变换提取信号的瞬时相位。这种方法具有三大优势:

  1. 抗幅度干扰 :聚焦相位关系,降低信号幅度波动的影响
  2. 生理意义明确 :神经振荡相位与信息编码密切相关
  3. 计算稳定性 :相位数据范围固定(通常0-2π),便于统计分析

希尔伯特变换的Matlab实现:

% 假设data为单通道时间序列
analytic_signal = hilbert(data);
phase_data = angle(analytic_signal); % 获取相位(-π到π)
phase_data = phase_data + pi; % 转换为0-2π范围

2.2 相位传递熵的计算流程

  1. 信号预处理 :滤波、去噪(必要时)
  2. 相位提取 :希尔伯特变换获取瞬时相位
  3. 直方图离散化 :将连续相位值分组到bin中
  4. 概率估计 :计算各种联合概率分布
  5. 熵值计算 :根据公式计算PTE值

3. Matlab实战:完整PTE计算实现

3.1 核心参数设置

两个关键参数直接影响PTE计算结果:

  1. binsize :相位直方图的分组宽度

    • 太小会导致数据稀疏
    • 太大会丢失细节信息
    • 推荐使用Scott规则: binsize = 3.49*std(phase)*N^(-1/3)
  2. delay :时间延迟参数

    • 反映信息传递的时间尺度
    • 可通过平均相位变化周期估计:
% 估计delay参数
counter = 0;
for i=2:length(phase_data)-1
    if (phase_data(i-1)-pi)*(phase_data(i+1)-pi)<0
        counter = counter+1;
    end
end
delay = round((length(phase_data)-2)/counter);

3.2 完整Matlab实现代码

function [PTE, dPTE] = calculate_PTE(data, binsize, delay)
    % 输入:
    % data - N×M矩阵,N样本数,M通道数
    % binsize - 直方图分组宽度(rad)
    % delay - 时间延迟(样本点)
    
    % 希尔伯特变换获取相位
    complex_data = hilbert(data);
    phase_data = angle(complex_data) + pi; % 0-2π
    
    [L, N] = size(phase_data);
    PTE = zeros(N,N);
    bins_w = 0:binsize:2*pi;
    Nbins = length(bins_w);
    
    for i=1:N
        for j=1:N
            if i~=j
                % 初始化概率矩阵
                Py = zeros(Nbins,1);
                Pypr_y = zeros(Nbins,Nbins);
                Py_x = zeros(Nbins,Nbins);
                Pypr_y_x = zeros(Nbins,Nbins,Nbins);
                
                % 离散化相位数据
                rn_ypr = ceil(phase_data(1+delay:end,i)/binsize);
                rn_y = ceil(phase_data(1:end-delay,i)/binsize);
                rn_x = ceil(phase_data(1:end-delay,j)/binsize);
                
                % 统计直方图
                for k=1:L-delay
                    Py(rn_y(k)) = Py(rn_y(k))+1;
                    Pypr_y(rn_ypr(k),rn_y(k)) = Pypr_y(rn_ypr(k),rn_y(k))+1;
                    Py_x(rn_y(k),rn_x(k)) = Py_x(rn_y(k),rn_x(k))+1;
                    Pypr_y_x(rn_ypr(k),rn_y(k),rn_x(k)) = Pypr_y_x(rn_ypr(k),rn_y(k),rn_x(k))+1;
                end
                
                % 计算概率
                Py = Py/(L-delay);
                Pypr_y = Pypr_y/(L-delay);
                Py_x = Py_x/(L-delay);
                Pypr_y_x = Pypr_y_x/(L-delay);
                
                % 计算各类熵值
                Hy = -nansum(Py.*log2(Py));
                Hypr_y = -nansum(nansum(Pypr_y.*log2(Pypr_y)));
                Hy_x = -nansum(nansum(Py_x.*log2(Py_x)));
                Hypr_y_x = -nansum(nansum(nansum(Pypr_y_x.*log2(Pypr_y_x))));
                
                % 计算PTE
                PTE(i,j) = Hypr_y + Hy_x - Hy - Hypr_y_x;
            end
        end
    end
    
    % 计算dPTE(定向PTE)
    tmp = triu(PTE) + tril(PTE)';
    dPTE = [triu(PTE./tmp,1) + tril(PTE./tmp',-1)];
end

4. Python实现与性能优化

4.1 基础Python实现

import numpy as np
from scipy.signal import hilbert

def phase_transfer_entropy(data, binsize, delay):
    """
    计算多通道信号的相位传递熵
    
    参数:
        data: numpy数组,形状为(N,M),N时间点,M通道
        binsize: 直方图分组宽度(弧度)
        delay: 时间延迟(样本点)
    
    返回:
        PTE: M×M的PTE矩阵
        dPTE: 定向PTE矩阵
    """
    # 希尔伯特变换获取相位
    analytic_signal = hilbert(data)
    phase_data = np.angle(analytic_signal) + np.pi  # 转换为0-2π
    
    L, N = phase_data.shape
    PTE = np.zeros((N, N))
    bins = np.arange(0, 2*np.pi + binsize, binsize)
    Nbins = len(bins) - 1  # 实际bin数量
    
    for i in range(N):
        for j in range(N):
            if i != j:
                # 离散化相位数据
                ypr = phase_data[delay:, i]
                y = phase_data[:-delay, i]
                x = phase_data[:-delay, j]
                
                rn_ypr = np.digitize(ypr, bins) - 1
                rn_y = np.digitize(y, bins) - 1
                rn_x = np.digitize(x, bins) - 1
                
                # 计算联合概率
                Py, _ = np.histogram(y, bins=bins, density=True)
                Pypr_y, _, _ = np.histogram2d(ypr, y, bins=[bins, bins], density=True)
                Py_x, _, _ = np.histogram2d(y, x, bins=[bins, bins], density=True)
                Pypr_y_x, _ = np.histogramdd(np.column_stack([ypr, y, x]), 
                                           bins=[bins, bins, bins],
                                           density=True)
                
                # 计算各类熵值
                Hy = -np.nansum(Py * np.log2(Py))
                Hypr_y = -np.nansum(Pypr_y * np.log2(Pypr_y))
                Hy_x = -np.nansum(Py_x * np.log2(Py_x))
                Hypr_y_x = -np.nansum(Pypr_y_x * np.log2(Pypr_y_x))
                
                # 计算PTE
                PTE[i,j] = Hypr_y + Hy_x - Hy - Hypr_y_x
    
    # 计算dPTE
    tmp = np.triu(PTE) + np.tril(PTE).T
    dPTE = np.triu(PTE/tmp, 1) + np.tril(PTE/tmp.T, -1)
    
    return PTE, dPTE

4.2 性能优化技巧

  1. 向量化计算 :使用numpy的向量化操作替代循环
  2. 并行处理 :利用multiprocessing模块加速多通道计算
  3. 内存优化 :对于大数据集,分块处理避免内存溢出
  4. GPU加速 :使用cupy库将计算转移到GPU

优化后的概率计算示例:

# 使用numpy的bincount加速直方图统计
def fast_histogram2d(x, y, bins):
    """快速二维直方图统计"""
    x_idx = np.digitize(x, bins) - 1
    y_idx = np.digitize(y, bins) - 1
    count = np.bincount(x_idx * len(bins) + y_idx, 
                       minlength=len(bins)**2)
    return count.reshape(len(bins), len(bins)) / len(x)

5. 实际应用中的关键问题与解决方案

5.1 参数选择经验法则

参数 推荐值 调整建议
binsize 3.49*std(phase)*N^(-1/3) 在0.1-0.5弧度间调整
delay 平均相位变化周期 通常在10-100ms(根据采样率换算)
信号长度 ≥1000样本点 短信号可考虑分段分析

5.2 常见报错与排查

  1. NaN或Inf结果

    • 检查是否有空bin导致log(0)
    • 解决方案:添加微小常数平滑概率分布
  2. 不对称的PTE矩阵

    • 理论上PTE应不对称
    • 如果完全对称,检查是否混淆了i,j索引
  3. 不合理的dPTE值

    • 确保dPTE计算正确: dPTE = PTE/(PTE + PTE.T)
  4. 计算时间过长

    • 减少bin数量
    • 使用优化后的算法
    • 考虑降采样(保持关键频段)

5.3 结果解释与可视化

典型的PTE分析流程包括:

  1. 构建连接矩阵 :将dPTE值排列为N×N矩阵
  2. 阈值处理 :使用显著性检验或置换检验确定有效连接
  3. 网络分析 :计算节点度、聚类系数等图论指标
  4. 可视化 :使用脑地形图或网络图展示信息流模式

Matlab可视化示例:

% 假设dPTE是30×30的矩阵(对应30通道EEG)
figure;
imagesc(dPTE);
colorbar;
title('Directed Phase Transfer Entropy Matrix');
xlabel('Source Channel');
ylabel('Target Channel');

% 网络可视化
[xx, yy] = meshgrid(1:30);
g = digraph(xx(dPTE>0.6), yy(dPTE>0.6), dPTE(dPTE>0.6));
plot(g, 'Layout', 'force', 'EdgeLabel', g.Edges.Weight);

Python中使用matplotlib和networkx:

import matplotlib.pyplot as plt
import networkx as nx

def visualize_pte(dPTE, threshold=0.6):
    """可视化dPTE矩阵和网络"""
    plt.figure(figsize=(12,5))
    
    plt.subplot(121)
    plt.imshow(dPTE, cmap='hot')
    plt.colorbar()
    plt.title('dPTE Matrix')
    
    plt.subplot(122)
    G = nx.DiGraph()
    rows, cols = np.where(dPTE > threshold)
    edges = [(i, j, dPTE[i,j]) for i,j in zip(rows, cols)]
    G.add_weighted_edges_from(edges)
    
    pos = nx.spring_layout(G)
    nx.draw(G, pos, with_labels=True, 
           edge_color=[d['weight'] for _,_,d in G.edges(data=True)],
           edge_cmap=plt.cm.Blues, width=2)
    plt.title('Information Flow Network')
    
    plt.tight_layout()
    plt.show()

6. 进阶技巧与最新发展

6.1 多变量PTE扩展

传统PTE只考虑两两信号间的信息流,而实际神经系统中常存在多节点交互。多变量PTE(Multivariate PTE)可以同时考虑多个源信号的影响:

% 多变量PTE核心计算公式
TE_multi = H(Y_f|Y_p,X1_p,X2_p) - H(Y_f|Y_p,X1_p,X2_p,X3_p)

6.2 时变PTE分析

通过滑动窗口技术,可以研究信息流动的时变特性:

def time_varying_PTE(data, window_size, step):
    """计算时变PTE"""
    n_samples, n_channels = data.shape
    n_windows = (n_samples - window_size) // step + 1
    pte_series = np.zeros((n_windows, n_channels, n_channels))
    
    for i in range(n_windows):
        window_data = data[i*step : i*step + window_size]
        pte_series[i], _ = phase_transfer_entropy(window_data, binsize, delay)
    
    return pte_series

6.3 频域PTE分析

结合小波变换或带通滤波,可以研究特定频段的信息流动:

  1. 对信号进行带通滤波(如theta:4-8Hz, alpha:8-13Hz)
  2. 分别计算各频段的PTE
  3. 比较不同频段的信息流模式

6.4 基于PTE的有效连接分析

PTE结果可用于构建有向脑功能网络,进而计算一系列图论指标:

  • 节点出度 :一个脑区向其他脑区发送信息的强度总和
  • 节点入度 :接收信息的强度总和
  • 网络效率 :信息传递的整体效率
  • 模块化 :网络中的功能模块划分
def network_analysis(dPTE, threshold=0.5):
    """基于dPTE矩阵的网络分析"""
    adj_matrix = (dPTE > threshold).astype(int)
    G = nx.from_numpy_array(adj_matrix, create_using=nx.DiGraph)
    
    results = {
        'out_degree': dict(G.out_degree(weight='weight')),
        'in_degree': dict(G.in_degree(weight='weight')),
        'global_efficiency': nx.global_efficiency(G),
        'local_efficiency': nx.local_efficiency(G),
        'modularity': nx_comm.modularity(G, nx_comm.label_propagation_communities(G))
    }
    
    return results

更多推荐