用Python和NumPy手把手教你模拟一个马尔可夫链(附完整代码和可视化)

马尔可夫链是数据科学和概率建模中的核心工具之一,它能将看似随机的过程转化为可计算的数学模型。想象一下,如果你能预测用户明天的行为、天气变化趋势甚至股票价格波动,这些都可以通过马尔可夫链来建模。本文将带你从零开始,用Python实现一个完整的马尔可夫链模拟,并通过可视化让你直观理解其工作原理。

1. 环境准备与基础概念

在开始编码前,我们需要确保环境配置正确。推荐使用Anaconda创建新的Python环境,或者直接使用pip安装所需库:

pip install numpy matplotlib

马尔可夫链的核心在于 无记忆性 ——未来状态只取决于当前状态。这种特性使其成为建模序列数据的理想工具。例如:

  • 用户行为预测:用户下一步操作(点击/购买/离开)只与当前页面相关
  • 天气系统:明天的天气概率仅由今天天气决定
  • 文本生成:下一个词的出现概率取决于当前词

状态转移矩阵 是马尔可夫链的核心数学表达。假设我们建模天气系统,包含"晴"、"雨"、"阴"三种状态,转移矩阵可能如下:

今天\明天
0.7 0.2 0.1
0.3 0.5 0.2
0.4 0.3 0.3

这个矩阵表示:如果今天是晴天,明天有70%概率仍是晴天,20%概率下雨,10%概率转阴。

2. 构建马尔可夫链模拟器

让我们用NumPy实现一个通用的马尔可夫链模拟器。首先定义核心类结构:

import numpy as np

class MarkovChain:
    def __init__(self, transition_matrix, states):
        """
        初始化马尔可夫链
        :param transition_matrix: 状态转移矩阵
        :param states: 状态列表
        """
        self.transition_matrix = np.array(transition_matrix)
        self.states = states
        self.state_index = {s:i for i,s in enumerate(states)}
        
    def next_state(self, current_state):
        """根据当前状态返回下一个状态"""
        current_idx = self.state_index[current_state]
        next_idx = np.random.choice(
            len(self.states), 
            p=self.transition_matrix[current_idx]
        )
        return self.states[next_idx]
    
    def generate_states(self, current_state, num_steps):
        """生成状态序列"""
        sequence = []
        for _ in range(num_steps):
            sequence.append(current_state)
            current_state = self.next_state(current_state)
        return sequence

这个类提供了两个核心方法:

  • next_state() :根据当前状态返回下一个状态
  • generate_states() :生成指定长度的状态序列

让我们用天气模型测试这个实现:

weather_states = ['晴', '雨', '阴']
weather_matrix = [
    [0.7, 0.2, 0.1],  # 晴
    [0.3, 0.5, 0.2],  # 雨
    [0.4, 0.3, 0.3]   # 阴
]

mc = MarkovChain(weather_matrix, weather_states)
print(mc.generate_states('晴', 10))

运行结果可能如下(每次运行会不同):

['晴', '晴', '晴', '晴', '阴', '晴', '晴', '雨', '雨', '雨']

3. 状态分布与长期行为分析

马尔可夫链最有趣的性质之一是它的长期行为。无论从哪个初始状态开始,经过足够多步转移后,系统会达到 稳态分布 。让我们用代码验证这一点:

def simulate_distribution(mc, initial_state, num_simulations=1000, steps=50):
    """模拟状态分布"""
    counts = {state:0 for state in mc.states}
    for _ in range(num_simulations):
        final_state = mc.generate_states(initial_state, steps)[-1]
        counts[final_state] += 1
    
    # 转换为概率分布
    total = sum(counts.values())
    return {state:count/total for state, count in counts.items()}

steady_dist = simulate_distribution(mc, '晴')
print("稳态分布:", steady_dist)

运行结果可能类似于:

稳态分布: {'晴': 0.524, '雨': 0.286, '阴': 0.19}

这个分布告诉我们,长期来看系统处于"晴"状态的概率约为52.4%,"雨"状态28.6%,"阴"状态19%。我们可以用线性代数验证这个结果:

matrix = np.array(weather_matrix)
eigenvalues, eigenvectors = np.linalg.eig(matrix.T)
steady = eigenvectors[:, np.isclose(eigenvalues, 1)][:, 0]
steady = steady / steady.sum()
print("理论稳态分布:", steady.real)

4. 可视化马尔可夫链行为

可视化能帮助我们更直观理解马尔可夫链的行为。我们将创建两种可视化:状态序列图和分布演化图。

首先实现状态序列可视化:

import matplotlib.pyplot as plt

def plot_sequence(sequence, states):
    plt.figure(figsize=(10, 3))
    y = [states.index(s) for s in sequence]
    plt.plot(y, 'o-')
    plt.yticks(range(len(states)), states)
    plt.xlabel('时间步')
    plt.title('马尔可夫链状态序列')
    plt.grid(True)
    plt.show()

seq = mc.generate_states('晴', 20)
plot_sequence(seq, weather_states)

接下来实现分布演化图,展示状态概率如何随时间变化:

def plot_distribution_evolution(mc, initial_dist, steps):
    """绘制状态分布随时间变化"""
    current_dist = np.array(initial_dist)
    history = [current_dist.copy()]
    
    for _ in range(steps):
        current_dist = current_dist @ mc.transition_matrix
        history.append(current_dist.copy())
    
    plt.figure(figsize=(10, 5))
    for i, state in enumerate(mc.states):
        plt.plot([dist[i] for dist in history], label=state)
    
    plt.xlabel('时间步')
    plt.ylabel('概率')
    plt.title('状态分布随时间变化')
    plt.legend()
    plt.grid(True)
    plt.show()

initial_dist = [1, 0, 0]  # 初始为晴天
plot_distribution_evolution(mc, initial_dist, 20)

5. 高级应用:用户行为建模

让我们看一个更实际的例子——建模用户网站行为。假设用户有三种状态:

  1. 首页 :用户刚进入网站
  2. 产品页 :浏览产品详情
  3. 结账 :进行购买操作

转移矩阵可能如下:

user_states = ['首页', '产品页', '结账']
user_matrix = [
    [0.2, 0.7, 0.1],  # 首页
    [0.3, 0.5, 0.2],  # 产品页
    [0.8, 0.1, 0.1]   # 结账
]

user_mc = MarkovChain(user_matrix, user_states)

我们可以计算用户最终完成购买的概率:

def absorption_probability(mc, target_state, max_steps=100):
    """计算吸收概率"""
    counts = 0
    for _ in range(1000):
        seq = mc.generate_states('首页', max_steps)
        if target_state in seq:
            counts += 1
    return counts / 1000

purchase_prob = absorption_probability(user_mc, '结账')
print(f"用户最终购买的概率: {purchase_prob:.1%}")

更高级的分析可以计算平均访问时长:

def average_steps_to_absorption(mc, target_state, max_trials=1000):
    total_steps = 0
    for _ in range(max_trials):
        steps = 0
        current_state = '首页'
        while current_state != target_state and steps < 100:
            current_state = user_mc.next_state(current_state)
            steps += 1
        total_steps += steps
    return total_steps / max_trials

avg_steps = average_steps_to_absorption(user_mc, '结账')
print(f"平均需要 {avg_steps:.1f} 步到达结账页面")

6. 性能优化与大规模应用

当状态空间很大时(如推荐系统中的数百万商品),我们需要优化实现。以下是几种优化策略:

稀疏矩阵优化 :大多数转移矩阵是稀疏的(多数为0),可以使用scipy的稀疏矩阵:

from scipy.sparse import csr_matrix

class SparseMarkovChain:
    def __init__(self, transition_matrix, states):
        self.transition_matrix = csr_matrix(transition_matrix)
        self.states = states
        self.state_index = {s:i for i,s in enumerate(states)}
    
    def next_state(self, current_state):
        current_idx = self.state_index[current_state]
        probs = self.transition_matrix[current_idx].toarray()[0]
        next_idx = np.random.choice(len(self.states), p=probs)
        return self.states[next_idx]

并行模拟 :使用多进程加速大量模拟:

from multiprocessing import Pool

def parallel_simulation(args):
    mc, initial_state, steps = args
    return mc.generate_states(initial_state, steps)

def run_parallel_simulations(mc, initial_state, steps, num_simulations=1000):
    with Pool() as pool:
        args = [(mc, initial_state, steps)] * num_simulations
        results = pool.map(parallel_simulation, args)
    return results

GPU加速 :对于超大规模状态空间,可以使用CuPy将计算转移到GPU:

import cupy as cp

class GPUMarkovChain:
    def __init__(self, transition_matrix, states):
        self.transition_matrix = cp.array(transition_matrix)
        self.states = states
        self.state_index = {s:i for i,s in enumerate(states)}
    
    def next_state_batch(self, current_states):
        """批量处理多个链的状态转移"""
        current_indices = cp.array([self.state_index[s] for s in current_states])
        probs = self.transition_matrix[current_indices]
        cum_probs = cp.cumsum(probs, axis=1)
        rand = cp.random.rand(len(current_states), 1)
        next_indices = cp.argmax(cum_probs > rand, axis=1)
        return [self.states[int(i)] for i in next_indices]

7. 实际应用案例

让我们看一个电商推荐系统的实际案例。假设我们想预测用户接下来可能感兴趣的商品类别:

# 定义商品类别和转移概率
categories = ['电子', '服装', '家居', '食品', '图书']
transition_data = [
    # 电子  服装  家居  食品  图书
    [0.4, 0.2, 0.2, 0.1, 0.1],  # 电子
    [0.3, 0.3, 0.2, 0.1, 0.1],  # 服装
    [0.2, 0.2, 0.4, 0.1, 0.1],  # 家居
    [0.1, 0.1, 0.1, 0.6, 0.1],  # 食品
    [0.2, 0.1, 0.1, 0.1, 0.5]   # 图书
]

category_mc = MarkovChain(transition_data, categories)

def recommend_next_category(current_category, top_n=3):
    """推荐最可能的下n个类别"""
    current_idx = category_mc.state_index[current_category]
    probs = category_mc.transition_matrix[current_idx]
    top_indices = np.argsort(probs)[-top_n:][::-1]
    return [(categories[i], probs[i]) for i in top_indices]

print("从'电子'出发的推荐:", recommend_next_category('电子'))

结果可能显示:

从'电子'出发的推荐: [('电子', 0.4), ('服装', 0.2), ('家居', 0.2)]

我们可以进一步扩展这个模型,加入用户个性化信息:

class PersonalizedMarkovChain:
    def __init__(self, base_matrix, personalization_matrix, states):
        """
        :param base_matrix: 基础转移矩阵
        :param personalization_matrix: 个性化调整矩阵
        """
        self.base_matrix = np.array(base_matrix)
        self.personalization = np.array(personalization_matrix)
        self.states = states
        self.state_index = {s:i for i,s in enumerate(states)}
    
    def get_personalized_matrix(self, user_profile):
        """根据用户画像获取个性化转移矩阵"""
        # 这里简化处理,实际中可能使用更复杂的融合算法
        return self.base_matrix * (1 + self.personalization * user_profile)
    
    def next_state(self, current_state, user_profile):
        matrix = self.get_personalized_matrix(user_profile)
        current_idx = self.state_index[current_state]
        row = matrix[current_idx]
        row = row / row.sum()  # 重新归一化
        next_idx = np.random.choice(len(self.states), p=row)
        return self.states[next_idx]

更多推荐