学习深度学习关键是动手

  • 深度学习是人工智能最热的领域
  • 核心是神经网络
  • 神经网络是一门语言
  • 应该像学习Python/C++一样学习深度学习

参考视频:跟李沐学AI
参考书目:动手学深度学习 PyTorch版


循环神经网络

序列模型

  • 序列数据
    • 实际中很多数据是有时序结构的
    • 电影的评价随时间变化而变化
      • 拿奖后评分上升,直到奖项被忘记
      • 看了很多好电影后,人们的期望变高
      • 季节性:贺岁片、暑期档
      • 导演、演员的负面报道导致评分变低

统计工具

  • 在时间 t t t 观察到 x t x_t xt,那么得到 T T T不独立的随机变量 ( x 1 , … , x T ) ∼ p ( x ) (x_1, \dots, x_T) \sim p(\mathbf{x}) (x1,,xT)p(x)
  • 使用条件概率展开 p ( a , b ) = p ( a ) p ( b ∣ a ) = p ( b ) p ( a ∣ b ) p(a, b) = p(a)p(b \mid a) = p(b)p(a \mid b) p(a,b)=p(a)p(ba)=p(b)p(ab)

序列模型

  • 联合概率的链式法则:
    p ( x ) = p ( x 1 ) ⋅ p ( x 2 ∣ x 1 ) ⋅ p ( x 3 ∣ x 1 , x 2 ) ⋅ … p ( x T ∣ x 1 , … x T − 1 ) p(\mathbf{x}) = p(x_1) \cdot p(x_2 \mid x_1) \cdot p(x_3 \mid x_1, x_2) \cdot \dots p(x_T \mid x_1, \dots x_{T-1}) p(x)=p(x1)p(x2x1)p(x3x1,x2)p(xTx1,xT1)
  • 对条件概率建模:
    p ( x t ∣ x 1 , … x t − 1 ) = p ( x t ∣ f ( x 1 , … x t − 1 ) ) p(x_t \mid x_1, \dots x_{t-1}) = p(x_t \mid f(x_1, \dots x_{t-1})) p(xtx1,xt1)=p(xtf(x1,xt1))
    (对见过的数据建模,自回归模型)

方案A,马尔科夫假设: 假设当前数据只跟 τ \tau τ个过去数据点相关
方案B,潜变量模型:
引入潜变量 h t h_t ht来表示过去信息 h t = f ( x 1 , … , x t − 1 ) h_t = f(x_1, \dots, x_{t-1}) ht=f(x1,,xt1)

这样 x t = p ( x t ∣ h t ) x_t = p(x_t \mid h_t) xt=p(xtht)
潜变量模型
代码实现(尝试使用马尔科夫假设):

%matplotlib inline
import torch
from torch import nn
from d2l import torch as d2l

T = 1000
time = torch.arange(1, T + 1, dtype=torch.float32)
# 生成带噪声的正弦时间序列:
x = torch.sin(0.01 * time) + torch.normal(0, 0.2, (T,))
d2l.plot(time, [x], 'time', 'x', xlim=[1, 1000], figsize=(6, 3))

将数据映射为数据对 y t = x t y_t = x_t yt=xt x t = [ x t − τ , … , x t − 1 ] \mathbf{x}_t = [x_{t-\tau}, \ldots, x_{t-1}] xt=[xtτ,,xt1]

tau = 4
features = torch.zeros((T - tau, tau))

# 填充特征矩阵:每一列对应不同延迟的时间序列数据
for i in range(tau):  # i从0到3(共4次循环)
    # 第i列填充x中从索引i开始、长度为(T-tau)的数据
    # 例如:
    # i=0时,取x[0:996] → 对应"当前时间点前4步"的数据
    # i=1时,取x[1:997] → 对应"当前时间点前3步"的数据
    # ...以此类推,最终每行features[i] = [x[i], x[i+1], x[i+2], x[i+3]]
    features[:, i] = x[i: T - tau + i]
# 定义标签:取原始序列中从第tau个时间点(索引4)开始到末尾的数据
labels = x[tau:].reshape((-1, 1))
# 从features和labels中取前600个样本作为训练数据
batch_size, n_train = 16, 600
train_iter = d2l.load_array((features[:n_train], labels[:n_train]),
                            batch_size, is_train=True)

使用一个简单的多层感知机:

def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.xavier_uniform_(m.weight)

def get_net():
    net = nn.Sequential(nn.Linear(4, 10),
                        nn.ReLU(),
                        nn.Linear(10, 1))
    net.apply(init_weights)
    return net

loss = nn.MSELoss(reduction='none')
def train(net, train_iter, loss, epochs, lr):
    trainer = torch.optim.Adam(net.parameters(), lr)
    for epoch in range(epochs):
        for X, y in train_iter:
            trainer.zero_grad()
            l = loss(net(X), y)
            l.sum().backward()
            trainer.step()
        print(f'epoch {epoch + 1}, '
              f'loss: {d2l.evaluate_loss(net, train_iter, loss):f}')

net = get_net()
train(net, train_iter, loss, 5, 0.01)

onestep_preds = net(features)
d2l.plot([time, time[tau:]],
         [x.detach().numpy(), onestep_preds.detach().numpy()], 'time',
         'x', legend=['data', '1-step preds'], xlim=[1, 1000],
         figsize=(6, 3))

进行多步预测(也就是使用自己的数据而不是原始数据来进行多部预测):

multistep_preds = torch.zeros(T)
multistep_preds[: n_train + tau] = x[: n_train + tau]
for i in range(n_train + tau, T):  # i从604循环到999(共396个时间点)
    # 用“过去tau=4个时间点的结果”作为输入,预测第i个时间点的值
    multistep_preds[i] = net(
        multistep_preds[i - tau:i].reshape((1, -1))  # 取[i-4, i-3, i-2, i-1]这4个点,转成模型需要的形状(1,4)
    )

d2l.plot([time, time[tau:], time[n_train + tau:]],
         [x.detach().numpy(), onestep_preds.detach().numpy(),
          multistep_preds[n_train + tau:].detach().numpy()], 'time',
         'x', legend=['data', '1-step preds', 'multistep preds'],
         xlim=[1, 1000], figsize=(6, 3))

![](https://i-blog.csdnimg.cn/direct/5adae25e35ab4a9c9f52453147838dbf.png

(由于误差累积,多部预测的效果不是很好)
更仔细地看一下 k k k步预测

max_steps = 64

# 创建特征矩阵features,用于存储"历史数据+未来多步预测结果"
# 形状为 (N, tau + max_steps),其中:
#   - N = T - tau - max_steps + 1:有效样本数(能完整容纳"tau个历史+max_steps个预测"的窗口数量)
#   - tau + max_steps:列数=历史窗口长度+预测步数(前tau列存历史,后max_steps列存预测)
features = torch.zeros((T - tau - max_steps + 1, tau + max_steps))

# 填充前tau列:存入真实的历史观测数据(滑动窗口方式)
# 目的:为每个样本准备"过去tau个时间点的真实数据",作为预测的初始输入
for i in range(tau):
    # 对第i列,从原始序列x中截取长度为N的数据填充
    # 例如i=0时取x[0:N],i=1时取x[1:N+1]...形成连续的tau个历史数据窗口
    features[:, i] = x[i: i + T - tau - max_steps + 1]

# 填充后max_steps列:用模型net递归预测未来多步,并存储预测结果
# 核心逻辑:每一步预测都依赖"最近的tau个数据"(可能是历史真实值或之前的预测结果)
for i in range(tau, tau + max_steps):
    # 输入:取当前列前tau个数据(features[:, i-tau:i])
    # 例如i=tau时,输入是前tau列的历史数据;i=tau+1时,输入是第1列到第tau列+第tau列的预测结果
    # 输出:模型预测的第i-tau步结果,填入第i列
    features[:, i] = net(features[:, i - tau:i]).reshape(-1)

steps = (1, 4, 16, 64)
d2l.plot([time[tau + i - 1: T - max_steps + i] for i in steps],
         [features[:, (tau + i - 1)].detach().numpy() for i in steps], 'time', 'x',
         legend=[f'{i}-step preds' for i in steps], xlim=[5, 1000],
         figsize=(6, 3))

在这里插入图片描述

文本预处理

文本预处理的核心思想是如何把词变成我们要训练的东西
代码实现:

import collections
import re
from d2l import torch as d2l

# 简单且暴力的预处理hh
def read_time_machine():
    """将时间机器数据集加载到文本行的列表中"""
    with open(d2l.download('time_machine'), 'r') as f:
        lines = f.readlines()  
    # 文本清洗与标准化:
    # re.sub('[^A-Za-z]+', ' ', line):将非字母的字符(如数字、标点)替换为空格;
    # [^A-Za-z]+ 的含义是:匹配 “一个或多个连续的非字母字符”
    # strip():去除每行首尾的空白;lower():将所有字母转为小写;
    return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]

def tokenize(lines, token='word'):
    """将文本行拆分为单词或字符词元"""
    if token == 'word':
        # 按空格分割每行,得到“单词列表的列表”(每行对应一个单词列表)
        return [line.split() for line in lines]
    elif token == 'char':
        # 将每行的每个字符拆分为独立元素,得到“字符列表的列表”
        return [list(line) for line in lines]
    else:
        print('错误:未知词元类型:' + token)

接下来另一个重要的概念:
构建一个字典,通常也叫做 词表(vocabulary)
用来将字符串类型的词元映射到从 0 0 0开始的数字索引中

class Vocab:  
    """文本词表:建立词元与索引的双向映射"""
    def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
        if tokens is None:
            tokens = []
        if reserved_tokens is None:
            reserved_tokens = []
        # 统计词频并按频率从高到低排序
        counter = count_corpus(tokens)
        self._token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True) 
        # 初始化词表(先加入未知词<unk>和预留词元)
        self.idx_to_token = ['<unk>'] + reserved_tokens  # 索引0固定为未知词
        self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)}
        
        # 加入高频词元(过滤低频词)
        for token, freq in self._token_freqs:
            if freq < min_freq:
                break  # 频率低于阈值,后续词元频率更低,直接停止
            if token not in self.token_to_idx:  # 避免重复添加
                self.idx_to_token.append(token)
                self.token_to_idx[token] = len(self.idx_to_token) - 1  # 记录当前索引
    
    def __len__(self):
        """返回词表大小"""
        return len(self.idx_to_token)
    
    def __getitem__(self, tokens):
        """将词元(单个或列表)转换为索引"""
        if not isinstance(tokens, (list, tuple)):
            # 单个词元:若不在词表中,返回<unk>的索引(0)
            return self.token_to_idx.get(tokens, self.unk)
        # 多个词元:递归转换每个词元
        return [self.__getitem__(token) for token in tokens]
    
    def to_tokens(self, indices):
        """将索引(单个或列表)转换为词元"""
        if not isinstance(indices, (list, tuple)):
            return self.idx_to_token[indices]
        # 多个索引:转换每个索引
        return [self.idx_to_token[index] for index in indices]
    
    @property
    def unk(self):
        """返回未知词<unk>的索引(固定为0)"""
        return 0
    @property
    def token_freqs(self):
        """返回词元频率列表(词元, 频率)"""
        return self._token_freqs

def count_corpus(tokens):  
    """统计词元的频率"""
    # 若输入是嵌套列表(如[[词元1, 词元2], [词元3, ...]]),先展平为一维列表
    if len(tokens) == 0 or isinstance(tokens[0], list):
        tokens = [token for line in tokens for token in line]
    # 用Counter统计每个词元的出现次数,返回{词元: 频率}字典
    return collections.Counter(tokens)
    

将所有功能打包到load_corpus_time_machine函数中:

def load_corpus_time_machine(max_tokens=-1):  
    """返回时光机器数据集的词元索引列表和词表"""
    lines = read_time_machine()
    tokens = tokenize(lines, 'char')
    vocab = Vocab(tokens)
    corpus = [vocab[token] for line in tokens for token in line]
    if max_tokens > 0:
        corpus = corpus[:max_tokens]
    return corpus, vocab

corpus, vocab = load_corpus_time_machine()
len(corpus), len(vocab)

语言模型

  • 给定文本序列 x 1 , … , x T x_1, \dots, x_T x1,,xT,语言模型的目标是估计联合概率 p ( x 1 , … , x T ) p(x_1, \dots, x_T) p(x1,,xT)
  • 它的应用包括:
    • 作为预训练模型(例如 BERT、GPT-3 等,通过语言模型任务学习文本的通用表示);
    • 生成本文:给定前面若干词,不断利用 x t ∼ p ( x t ∣ x 1 , … , x t − 1 ) x_t \sim p(x_t \mid x_1, \dots, x_{t-1}) xtp(xtx1,,xt1)(“下一个词基于前面所有词的条件概率”)来生成后续文本;
    • 判断多个序列中哪个更常见(例如对比 “to recognize” vs “to wreck a nice beach”,前者是自然常见的表达,后者是罕见/错误序列,语言模型可通过概率区分)。

使用计数来建模

  • 假设序列长度为2,我们预测
    p ( x , x ′ ) = p ( x ) p ( x ′ ∣ x ) = n ( x ) n ⋅ n ( x , x ′ ) n ( x ) p(x, x') = p(x)p(x' \mid x) = \frac{n(x)}{n} \cdot \frac{n(x, x')}{n(x)} p(x,x)=p(x)p(xx)=nn(x)n(x)n(x,x)

    • 这里 n 是总词数, n(x), n(x, x’) 是单个单词和连续单词对的出现次数
  • 很容易拓展到长为3的情况
    p ( x , x ′ , x ′ ′ ) = p ( x ) p ( x ′ ∣ x ) p ( x ′ ′ ∣ x , x ′ ) = n ( x ) n ⋅ n ( x , x ′ ) n ( x ) ⋅ n ( x , x ′ , x ′ ′ ) n ( x , x ′ ) p(x, x', x'') = p(x)p(x' \mid x)p(x'' \mid x, x') = \frac{n(x)}{n} \cdot \frac{n(x, x')}{n(x)} \cdot \frac{n(x, x', x'')}{n(x, x')} p(x,x,x′′)=p(x)p(xx)p(x′′x,x)=nn(x)n(x)n(x,x)n(x,x)n(x,x,x′′)

  • 当序列很长时,因为文本量不够大,很可能 n ( x 1 , … , x T ) ≤ 1 n(x_1, \dots, x_T) \leq 1 n(x1,,xT)1

  • 使用马尔科夫假设可以缓解这个问题

    • 一元语法(Unigram):假设词之间完全独立,联合概率拆分为每个词单独出现概率的乘积(一元语法实际使用的不多):
      p ( x 1 , x 2 , x 3 , x 4 ) = p ( x 1 ) p ( x 2 ) p ( x 3 ) p ( x 4 ) = n ( x 1 ) n ⋅ n ( x 2 ) n ⋅ n ( x 3 ) n ⋅ n ( x 4 ) n p(x_1, x_2, x_3, x_4) = p(x_1)p(x_2)p(x_3)p(x_4) = \frac{n(x_1)}{n} \cdot \frac{n(x_2)}{n} \cdot \frac{n(x_3)}{n} \cdot \frac{n(x_4)}{n} p(x1,x2,x3,x4)=p(x1)p(x2)p(x3)p(x4)=nn(x1)nn(x2)nn(x3)nn(x4)

    • 二元语法(Bigram):假设当前词仅依赖前1个词,联合概率拆分为“首词概率”与“后续词依赖前1个词的条件概率”的乘积:
      p ( x 1 , x 2 , x 3 , x 4 ) = p ( x 1 ) p ( x 2 ∣ x 1 ) p ( x 3 ∣ x 2 ) p ( x 4 ∣ x 3 ) = n ( x 1 ) n ⋅ n ( x 1 , x 2 ) n ( x 1 ) ⋅ n ( x 2 , x 3 ) n ( x 2 ) ⋅ n ( x 3 , x 4 ) n ( x 3 ) p(x_1, x_2, x_3, x_4) = p(x_1)p(x_2 \mid x_1)p(x_3 \mid x_2)p(x_4 \mid x_3) = \frac{n(x_1)}{n} \cdot \frac{n(x_1, x_2)}{n(x_1)} \cdot \frac{n(x_2, x_3)}{n(x_2)} \cdot \frac{n(x_3, x_4)}{n(x_3)} p(x1,x2,x3,x4)=p(x1)p(x2x1)p(x3x2)p(x4x3)=nn(x1)n(x1)n(x1,x2)n(x2)n(x2,x3)n(x3)n(x3,x4)

    • 三元语法(Trigram):假设当前词仅依赖前2个词,联合概率拆分为“首词概率”“次词依赖首词的概率”与“后续词依赖前2个词的条件概率”的乘积:
      p ( x 1 , x 2 , x 3 , x 4 ) = p ( x 1 ) p ( x 2 ∣ x 1 ) p ( x 3 ∣ x 1 , x 2 ) p ( x 4 ∣ x 2 , x 3 ) = n ( x 1 ) n ⋅ n ( x 1 , x 2 ) n ( x 1 ) ⋅ n ( x 1 , x 2 , x 3 ) n ( x 1 , x 2 ) ⋅ n ( x 2 , x 3 , x 4 ) n ( x 2 , x 3 ) p(x_1, x_2, x_3, x_4) = p(x_1)p(x_2 \mid x_1)p(x_3 \mid x_1, x_2)p(x_4 \mid x_2, x_3) = \frac{n(x_1)}{n} \cdot \frac{n(x_1, x_2)}{n(x_1)} \cdot \frac{n(x_1, x_2, x_3)}{n(x_1, x_2)} \cdot \frac{n(x_2, x_3, x_4)}{n(x_2, x_3)} p(x1,x2,x3,x4)=p(x1)p(x2x1)p(x3x1,x2)p(x4x2,x3)=nn(x1)n(x1)n(x1,x2)n(x1,x2)n(x1,x2,x3)n(x2,x3)n(x2,x3,x4)

代码实现:

import random
import torch
from d2l import torch as d2l

# 单词语料库构建与词频统计
tokens = d2l.tokenize(d2l.read_time_machine())
corpus = [token for line in tokens for token in line]
vocab = d2l.Vocab(corpus)
vocab.token_freqs[:10]

# 绘制词频的双对数图
# 最流行的词被称为*停用词*(通常对文本的主题内容贡献不大)
freqs = [freq for token, freq in vocab.token_freqs]
d2l.plot(freqs, xlabel='token: x', ylabel='frequency: n(x)', xscale='log', yscale='log')

其他的词元组合:

# 二元语法
bigram_tokens = [pair for pair in zip(corpus[:-1], corpus[1:])]
bigram_vocab = d2l.Vocab(bigram_tokens)
bigram_vocab.token_freqs[:10]
# 三元语法
trigram_tokens = [triple for triple in zip(
corpus[:-2], corpus[1:-1], corpus[2:])]
trigram_vocab = d2l.Vocab(trigram_tokens)
trigram_vocab.token_freqs[:10]
# 直观对比三种模型不同词元出现的频率
bigram_freqs = [freq for token, freq in bigram_vocab.token_freqs]
trigram_freqs = [freq for token, freq in trigram_vocab.token_freqs]
d2l.plot([freqs, bigram_freqs, trigram_freqs], xlabel='token: x',
         ylabel='frequency: n(x)', xscale='log', yscale='log',
         legend=['unigram', 'bigram', 'trigram'])

三种模型词元频率
随机生成一个小批量数据的特征和标签以供读取:

def seq_data_iter_random(corpus, batch_size, num_steps):  
    """使用随机抽样生成一个小批量子序列"""
    # 随机偏移语料库,避免固定起始位置
    corpus = corpus[random.randint(0, num_steps - 1):]
    # 计算可提取的完整子序列数量(每个子序列长度为num_steps)
    num_subseqs = (len(corpus) - 1) // num_steps
    # 生成所有子序列的起始索引(不重叠)
    initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
    # 打乱起始索引,实现随机抽样
    random.shuffle(initial_indices)

    def data(pos):
        return corpus[pos: pos + num_steps]

    # 计算总批次数
    num_batches = num_subseqs // batch_size
    # 按批次生成数据
    for i in range(0, batch_size * num_batches, batch_size):
        # 当前批次的起始索引
        initial_indices_per_batch = initial_indices[i: i + batch_size]
        # 输入序列X:每个元素是长度为num_steps的子序列
        X = [data(j) for j in initial_indices_per_batch]
        # 目标序列Y:每个元素是X的下一时间步(滞后1步)
        Y = [data(j + 1) for j in initial_indices_per_batch]
        # 返回当前批次的X和Y(转为张量)
        yield torch.tensor(X), torch.tensor(Y)

如果想保证两个相邻的小批量中的子序列在原始序列上也是相邻的,可以用:

def seq_data_iter_sequential(corpus, batch_size, num_steps):  
    """使用顺序分区生成一个小批量子序列"""
    # 随机偏移起始位置,避免固定模式
    offset = random.randint(0, num_steps)
    # 计算可用token总数(确保能被batch_size整除,便于后续reshape)
    num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
    # 提取输入序列X和目标序列Y(Y比X滞后1步)
    Xs = torch.tensor(corpus[offset: offset + num_tokens])
    Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
    # 重塑为batch_size行的并行长序列(每行一个子序列)
    Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
    # 计算总批次数(每个并行序列可生成多少个num_steps长度的子序列)
    num_batches = Xs.shape[1] // num_steps
    # 按顺序截取子序列,生成批量数据
    for i in range(0, num_steps * num_batches, num_steps):
        X = Xs[:, i: i + num_steps]  # 输入:每个并行序列的[i, i+num_steps)片段
        Y = Ys[:, i: i + num_steps]  # 目标:对应下一时间步的片段
        yield X, Y

包装采样函数和定义函数load_data_time_machine:

class SeqDataLoader:  
    """加载序列数据的迭代器"""
    def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
        if use_random_iter:
            self.data_iter_fn = d2l.seq_data_iter_random
        else:
            self.data_iter_fn = d2l.seq_data_iter_sequential
        self.corpus, self.vocab = d2l.load_corpus_time_machine(max_tokens)
        self.batch_size, self.num_steps = batch_size, num_steps

    def __iter__(self):
        return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)
def load_data_time_machine(batch_size, num_steps,  
                           use_random_iter=False, max_tokens=10000):
    """返回时光机器数据集的迭代器和词表"""
    data_iter = SeqDataLoader(
        batch_size, num_steps, use_random_iter, max_tokens)
    return data_iter, data_iter.vocab

循环神经网络 RNN

循环神经网络

  • 更新隐藏状态: h t = ϕ ( W h h h t − 1 + W h x x t − 1 + b h ) \mathbf{h}_t = \phi(\mathbf{W}_{hh}\mathbf{h}_{t-1} + \mathbf{W}_{hx}\mathbf{x}_{t-1} + \mathbf{b}_h) ht=ϕ(Whhht1+Whxxt1+bh)
  • 输出: o t = ϕ ( W h o h t + b o ) \mathbf{o}_t = \phi(\mathbf{W}_{ho}\mathbf{h}_t + \mathbf{b}_o) ot=ϕ(Whoht+bo)
    输入输出

困惑度(perplexity)

  • 衡量一个语言模型的好坏可以用平均交叉熵:
    π = 1 n ∑ i = 1 n − log ⁡ p ( x t ∣ x t − 1 , …   ) \pi = \frac{1}{n} \sum_{i=1}^{n} -\log p(x_t | x_{t-1}, \dots) π=n1i=1nlogp(xtxt1,)
    其中, p 是语言模型的预测概率, x t x_t xt 是真实词。

  • 历史原因,NLP 领域使用困惑度 exp ⁡ ( π ) \exp(\pi) exp(π) 来衡量模型表现,它表示“平均每次预测时的可选选项数量”:

    • 困惑度为 1 表示模型完美(每次都能精准预测正确词);
    • 困惑度为无穷大表示模型最差(完全无法预测正确词)。
  • 二者都是 “ 数值越小,模型预测越准 ” 。

梯度裁剪

  • 迭代中计算这 T 个时间步上的梯度,在反向传播过程中产生长度为 O(T) 的矩阵乘法链,导致数值不稳定。
  • 梯度裁剪能有效预防梯度爆炸:
    • 如果梯度长度超过 θ \theta θ ,那么将其裁剪回长度 θ \theta θ ,公式为:
      g ← min ⁡ ( 1 , θ ∥ g ∥ ) g \mathbf{g} \leftarrow \min\left(1, \frac{\theta}{\|\mathbf{g}\|}\right) \mathbf{g} gmin(1,gθ)g

从零开始实现

%matplotlib inline
import math
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
# 初始化循环神经网络模型参数
def get_params(vocab_size, num_hiddens, device):
    # 输入和输出维度均等于词汇表大小(输入为独热编码,输出为下一个词的概率分布)
    num_inputs = num_outputs = vocab_size

    # 定义参数初始化函数:生成小随机数(标准正态分布缩放0.01倍)
    def normal(shape):
        return torch.randn(size=shape, device=device) * 0.01

    W_xh = normal((num_inputs, num_hiddens))
    W_hh = normal((num_hiddens, num_hiddens))
    b_h = torch.zeros(num_hiddens, device=device)
    W_hq = normal((num_hiddens, num_outputs))
    b_q = torch.zeros(num_outputs, device=device)

    params = [W_xh, W_hh, b_h, W_hq, b_q]
    for param in params:
        param.requires_grad_(True)
    
    return params
    
# 初始化隐藏状态
def init_rnn_state(batch_size, num_hiddens, device):
    return (torch.zeros((batch_size, num_hiddens), device=device), )

# RNN前向计算:按时间步迭代处理序列
def rnn(inputs, state, params):
    # 解包参数:输入→隐藏、隐藏→隐藏、隐藏偏置、隐藏→输出、输出偏置
    W_xh, W_hh, b_h, W_hq, b_q = params
    H, = state  # 解包初始隐藏状态
    outputs = []  # 存储每个时间步的输出
    
    for X in inputs:
        H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
        Y = torch.mm(H, W_hq) + b_q
        outputs.append(Y)
    
    # 拼接所有时间步的输出,返回输出和最终隐藏状态
    return torch.cat(outputs, dim=0), (H,)

# 创建一个类来包装一下这些函数
class RNNModelScratch:
    """从零实现的循环神经网络模型类,封装RNN核心逻辑并支持灵活扩展""" 
    def __init__(self, vocab_size, num_hiddens, device, get_params, init_state, forward_fn):
        # 初始化模型基本配置:词汇表大小、隐藏层维度、计算设备
        self.vocab_size, self.num_hiddens, self.device = vocab_size, num_hiddens, device
        # 注入核心函数:参数生成、状态初始化、前向计算(支持替换为LSTM等变种)
        self.params = get_params(vocab_size, num_hiddens, device)  # 模型参数(权重/偏置)
        self.init_state, self.forward_fn = init_state, forward_fn  # 状态初始化和前向计算函数

    def __call__(self, X, state):
        """前向计算接口:输入序列→输出+新状态"""
        # 输入转为独热编码(适配RNN输入格式),调用注入的前向函数计算
        X = F.one_hot(X.T, self.vocab_size).type(torch.float32)  # 转置+独热编码
        return self.forward_fn(X, state, self.params)  # 返回输出和更新后的状态

    def begin_state(self, batch_size, device):
        """初始化隐藏状态(如全0张量),封装状态初始化逻辑"""
        return self.init_state(batch_size, self.num_hiddens, device)
    
def predict_ch8(prefix, num_preds, net, vocab, device):  
    """在prefix后面生成新字符"""
     # 初始化隐藏状态:批量大小为1(一次生成一个序列)
    state = net.begin_state(batch_size=1, device=device)
    # 处理前缀的第一个字符:转换为索引并存入输出列表
    # outputs用于记录所有字符(前缀+生成字符)的索引
    outputs = [vocab[prefix[0]]]  # vocab[char]获取字符对应的整数索引
    # 定义输入函数:获取上一个字符的索引作为下一次输入
    get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
    # 逐字处理前缀剩余字符,更新隐藏状态(积累上下文信息)
    for y in prefix[1:]:  # 遍历前缀中除第一个字符外的所有字符
        _, state = net(get_input(), state)
        outputs.append(vocab[y])
    # 生成指定数量的新字符
    for _ in range(num_preds):
        y, state = net(get_input(), state)
        # 取概率最大的字符索引(贪心解码策略),加入输出列表
        # argmax(dim=1):在词汇表维度取最大值索引
        outputs.append(int(y.argmax(dim=1).reshape(1)))
    # 将所有索引转换为字符,拼接成完整字符串返回
    # vocab.idx_to_token[i]:通过索引获取对应的字符
    return ''.join([vocab.idx_to_token[i] for i in outputs])

梯度裁剪实现:
g ← min ⁡ ( 1 , θ ∥ g ∥ ) g \mathbf{g} \leftarrow \min\left(1, \frac{\theta}{\|\mathbf{g}\|}\right) \mathbf{g} gmin(1,gθ)g

def grad_clipping(net, theta):  
    """裁剪梯度"""
    if isinstance(net, nn.Module):
        params = [p for p in net.parameters() if p.requires_grad]
    else:
        params = net.params
    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
    if norm > theta:
        for param in params:
            param.grad[:] *= theta / norm

训练:

def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):
    """训练网络一个迭代周期"""
    state, timer = None, d2l.Timer()  # 初始化隐藏状态和计时器
    metric = d2l.Accumulator(2)  # 累计[总损失, 总样本数]
    
    for X, Y in train_iter:
        # 初始化或重置隐藏状态
        if state is None or use_random_iter:
            state = net.begin_state(batch_size=X.shape[0], device=device)
        else:
            # 顺序迭代时截断梯度(避免历史依赖导致内存爆炸)
            if isinstance(net, torch.nn.Module) and not isinstance(state, tuple):
                state.detach_()
            else:
                for s in state:
                    s.detach_()
        
        # 处理输入和目标
        y = Y.T.reshape(-1)  # 调整目标形状以匹配输出
        X, y = X.to(device), y.to(device)
        
        # 前向计算与损失
        y_hat, state = net(X, state)
        l = loss(y_hat, y.long()).mean()
        
        # 反向传播与参数更新
        if isinstance(updater, torch.optim.Optimizer):
            updater.zero_grad()
            l.backward()
            grad_clipping(net, 1)  # 梯度裁剪
            updater.step()
        else:
            l.backward()
            grad_clipping(net, 1)
            updater(batch_size=1)
        
        metric.add(l * y.numel(), y.numel())  # 累计指标
    
    # 返回困惑度(exp(平均损失))和训练速度
    return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()
def train_ch8(net, train_iter, vocab, lr, num_epochs, device,
              use_random_iter=False):
    """
    循环神经网络(RNN)的完整训练控制函数,负责多轮训练、性能监控和结果展示
    参数:
        net: 待训练的RNN模型(可是nn.Module子类或自定义模型)
        train_iter: 训练数据迭代器,生成(输入序列X, 目标序列Y)批量
        vocab: 词汇表,包含字符与索引的映射关系
        lr: 学习率,控制参数更新幅度
        num_epochs: 总训练轮数(完整遍历训练数据的次数)
        device: 计算设备(CPU或GPU)
        use_random_iter: 是否使用随机迭代模式(False为顺序迭代,保持序列连续性)
    """
    loss = nn.CrossEntropyLoss()
    
    animator = d2l.Animator(
        xlabel='epoch',  # x轴标签:训练轮数
        ylabel='perplexity',  # y轴标签:困惑度(越低模型越好)
        legend=['train'],  # 图例:训练集
        xlim=[10, num_epochs]  # x轴范围:从第10轮到总轮数
    )
    
    if isinstance(net, nn.Module):
        # 若为PyTorch原生nn.Module模型(如nn.RNN),使用内置SGD优化器
        updater = torch.optim.SGD(net.parameters(), lr)
    else:
        # 若为自定义模型(如RNNModelScratch),使用自定义SGD更新函数
        updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size)
    
    predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device)
    
    for epoch in range(num_epochs):
        ppl, speed = train_epoch_ch8(
            net, train_iter, loss, updater, device, use_random_iter
        )
        # 每10轮输出一次生成结果并更新可视化图表
        if (epoch + 1) % 10 == 0:
            # 打印以'time traveller'为前缀的生成文本,观察模型生成质量
            print(predict('time traveller'))
            animator.add(epoch + 1, [ppl])
    
    # 打印最终困惑度、训练速度和使用的设备
    print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')
    # 打印模型对两个不同前缀的最终生成结果,评估模型泛化能力
    print(predict('time traveller'))
    print(predict('traveller'))
    

简洁实现

import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

# 定义模型
num_hiddens = 256
rnn_layer = nn.RNN(len(vocab), num_hiddens)

state = torch.zeros((1, batch_size, num_hiddens))
# 注意:模块式 nn.RNN 只负责 “提取序列的隐藏特征”,而 “特征如何映射到任务输出”(如词汇表)需用户通过 Linear 层自定义
class RNNModel(nn.Module):
    """循环神经网络模型(支持基础RNN、LSTM,单向/双向模式)"""
    def __init__(self, rnn_layer, vocab_size, **kwargs):
        super(RNNModel, self).__init__(** kwargs)
        self.rnn = rnn_layer  # 内置RNN层(如nn.RNN、nn.LSTM)
        self.vocab_size = vocab_size  # 词汇表大小(输出类别数)
        self.num_hiddens = self.rnn.hidden_size  # 隐藏层维度
        
        # 根据RNN是否双向,初始化线性层(将隐藏状态映射到词汇表)
        if not self.rnn.bidirectional:
            self.num_directions = 1  # 单向RNN
            self.linear = nn.Linear(self.num_hiddens, self.vocab_size)
        else:
            self.num_directions = 2  # 双向RNN(需拼接两个方向的隐藏状态)
            self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)

    def forward(self, inputs, state):
        """前向计算:输入序列→预测输出+更新后的状态"""
        # 输入处理:字符索引→独热编码(适配RNN输入格式)
        X = F.one_hot(inputs.T.long(), self.vocab_size)
        X = X.to(torch.float32)  # 转换为浮点型(RNN层要求的输入类型)
        # RNN特征提取:输入序列→所有时间步的隐藏状态+最终状态
        Y, state = self.rnn(X, state)  # Y形状:(num_steps, batch_size, 隐藏层维度×方向数)
        # 输出预测:隐藏状态→词汇表空间(预测下一个字符)
        # 合并时间步和批量维度:(num_steps×batch_size, 隐藏层维度×方向数)
        output = self.linear(Y.reshape((-1, Y.shape[-1])))
        return output, state  # output形状:(num_steps×batch_size, vocab_size)

    def begin_state(self, device, batch_size=1):
        """初始化隐藏状态(支持RNN和LSTM)"""
        # 隐藏状态形状:(方向数×层数, batch_size, 隐藏层维度)
        state_shape = (self.num_directions * self.rnn.num_layers,
                       batch_size, self.num_hiddens)
        
        if not isinstance(self.rnn, nn.LSTM):
            # 基础RNN:仅需一个隐藏状态张量
            return torch.zeros(state_shape, device=device)
        else:
            # LSTM:需要两个状态(隐藏状态h和细胞状态c)
            return (torch.zeros(state_shape, device=device),
                    torch.zeros(state_shape, device=device))

训练:

device = d2l.try_gpu()
net = RNNModel(rnn_layer, vocab_size=len(vocab))
net = net.to(device)
d2l.predict_ch8('time traveller', 10, net, vocab, device)

num_epochs, lr = 500, 1
d2l.train_ch8(net, train_iter, vocab, lr, num_epochs, device)

门控循环单元 GRU

关注一个序列不是每个观察值都是同等重要
想只记住相关的观察需要:能关注的机制(更新门) and 能遗忘的机制(重置门)

门:
gate
更新门 Z t Z_t Zt、重置门 R t R_t Rt 与隐藏状态 H t H_t Ht 的维度完全一致
候选隐状态:
在这里插入图片描述
隐状态:
隐状态

GRU代码实现:
从零开始实现:

import torch
from torch import nn
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
# 初始化模型参数
def get_params(vocab_size, num_hiddens, device):
    num_inputs = num_outputs = vocab_size

    def normal(shape):
        return torch.randn(size=shape, device=device)*0.01

    def three():
        return (normal((num_inputs, num_hiddens)),
                normal((num_hiddens, num_hiddens)),
                torch.zeros(num_hiddens, device=device))

    W_xz, W_hz, b_z = three() # 更新门参数
    W_xr, W_hr, b_r = three() # 重置门参数
    W_xh, W_hh, b_h = three() # 候隐藏状态参数
    # 输出层参数
    W_hq = normal((num_hiddens, num_outputs))
    b_q = torch.zeros(num_outputs, device=device)
    # 附加梯度
    params = [W_xz, W_hz, b_z, W_xr, W_hr, b_r, W_xh, W_hh, b_h, W_hq, b_q]
    for param in params:
        param.requires_grad_(True)
    return params

# 定义隐状态的初始化函数
def init_gru_state(batch_size, num_hiddens, device):
    return (torch.zeros((batch_size, num_hiddens), device=device), )

# 定义GRU模型
def gru(inputs, state, params):
    W_xz, W_hz, b_z, W_xr, W_hr, b_r, W_xh, W_hh, b_h, W_hq, b_q = params
    H, = state
    outputs = []
    for X in inputs:
        Z = torch.sigmoid((X @ W_xz) + (H @ W_hz) + b_z)
        R = torch.sigmoid((X @ W_xr) + (H @ W_hr) + b_r)
        H_tilda = torch.tanh((X @ W_xh) + ((R * H) @ W_hh) + b_h)
        H = Z * H + (1 - Z) * H_tilda
        Y = H @ W_hq + b_q
        outputs.append(Y)
    return torch.cat(outputs, dim=0), (H,)

# 训练
vocab_size, num_hiddens, device = len(vocab), 256, d2l.try_gpu()
num_epochs, lr = 500, 1
model = d2l.RNNModelScratch(len(vocab), num_hiddens, device, get_params,
                            init_gru_state, gru)
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)

简洁实现:

num_inputs = vocab_size
gru_layer = nn.GRU(num_inputs, num_hiddens)
model = d2l.RNNModel(gru_layer, len(vocab))
model = model.to(device)
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)

长短期记忆网络 LSTM

  • 忘记门:将值朝0减少
  • 输入门:决定不是忽略掉输入数据
  • 输出门:决定是不是使用隐状态

gate
候选记忆单元:
在这里插入图片描述
记忆单元
在这里插入图片描述
代码实现(和之前的挺相似的):

import torch
from torch import nn
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

# 初始化模型参数
def get_lstm_params(vocab_size, num_hiddens, device):
    num_inputs = num_outputs = vocab_size

    def normal(shape):
        return torch.randn(size=shape, device=device)*0.01

    def three():
        return (normal((num_inputs, num_hiddens)),
                normal((num_hiddens, num_hiddens)),
                torch.zeros(num_hiddens, device=device))

    W_xi, W_hi, b_i = three()
    W_xf, W_hf, b_f = three()
    W_xo, W_ho, b_o = three()
    W_xc, W_hc, b_c = three()
    W_hq = normal((num_hiddens, num_outputs))
    b_q = torch.zeros(num_outputs, device=device)
    params = [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc,
              b_c, W_hq, b_q]
    for param in params:
        param.requires_grad_(True)
    return params

# 定义模型
def init_lstm_state(batch_size, num_hiddens, device):
    return (torch.zeros((batch_size, num_hiddens), device=device),
            torch.zeros((batch_size, num_hiddens), device=device))

def lstm(inputs, state, params):
    [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c,
     W_hq, b_q] = params
    (H, C) = state
    outputs = []
    for X in inputs:
        I = torch.sigmoid((X @ W_xi) + (H @ W_hi) + b_i)
        F = torch.sigmoid((X @ W_xf) + (H @ W_hf) + b_f)
        O = torch.sigmoid((X @ W_xo) + (H @ W_ho) + b_o)
        C_tilda = torch.tanh((X @ W_xc) + (H @ W_hc) + b_c)
        C = F * C + I * C_tilda
        H = O * torch.tanh(C)
        Y = (H @ W_hq) + b_q
        outputs.append(Y)
    return torch.cat(outputs, dim=0), (H, C)

# 训练
vocab_size, num_hiddens, device = len(vocab), 256, d2l.try_gpu()
num_epochs, lr = 500, 1
model = d2l.RNNModelScratch(len(vocab), num_hiddens, device, get_lstm_params,
                            init_lstm_state, lstm)
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)

简洁实现:

num_inputs = vocab_size
lstm_layer = nn.LSTM(num_inputs, num_hiddens)
model = d2l.RNNModel(lstm_layer, len(vocab))
model = model.to(device)
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)

深度循环神经网络

思想就是多加几个隐藏层
在这里插入图片描述
代码实现:

import torch
from torch import nn
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

vocab_size, num_hiddens, num_layers = len(vocab), 256, 2
num_inputs = vocab_size
device = d2l.try_gpu()
lstm_layer = nn.LSTM(num_inputs, num_hiddens, num_layers)
model = d2l.RNNModel(lstm_layer, len(vocab))
model = model.to(device)

num_epochs, lr = 500, 2
d2l.train_ch8(model, train_iter, vocab, lr*1.0, num_epochs, device)

双向循环神经网络

未来很重要:

  • 取决于过去和未来的上下文,可以填很不一样的词
  • 目前为止RNN只看过去
  • 在填空的时候,我们也可以看未来

双向RNN
在这里插入图片描述

  • 一个前向RNN隐层
  • 一个反向RNN隐层
  • 合并两个隐状态得到输出
    在这里插入图片描述
    双向RNN 不适合做推理
    主要的作用是对序列做特征提取、填空,而不是预测未来

代码实现:
(这是错误应用,这类任务中 “未来信息” 在推理时不可得,而双向 RNN 训练时会错误利用未来信息,导致应用场景不匹配。)

import torch
from torch import nn
from d2l import torch as d2l

batch_size, num_steps, device = 32, 35, d2l.try_gpu()
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
vocab_size, num_hiddens, num_layers = len(vocab), 256, 2
num_inputs = vocab_size
lstm_layer = nn.LSTM(num_inputs, num_hiddens, num_layers, bidirectional=True)
model = d2l.RNNModel(lstm_layer, len(vocab))
model = model.to(device)
num_epochs, lr = 500, 1
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)

机器翻译与数据集

import os
import torch
from d2l import torch as d2l

下载预处理数据集:

d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip',
                           '94646ad1522d915e7b0f9296181140edcf86a4f5')

def read_data_nmt():
    """载入“英语-法语”数据集"""
    data_dir = d2l.download_extract('fra-eng') # 下载并解压数据集
    with open(os.path.join(data_dir, 'fra.txt'), 'r',
             encoding='utf-8') as f:
        return f.read()

raw_text = read_data_nmt()
print(raw_text[:75])

预处理:

def preprocess_nmt(text):
    """预处理“英语-法语”数据集"""
    def no_space(char, prev_char):
        # 判断标点前是否需要加空格:若字符是`,.!?`且前一个字符不是空格,则需要加
        return char in set(',.!?') and prev_char != ' '

    # 替换特殊空格、转小写
    text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower()
    # 规范标点格式:在需要的标点前加空格
    out = [' ' + char if i > 0 and no_space(char, text[i-1]) else char
           for i, char in enumerate(text)]
    return ''.join(out)

text = preprocess_nmt(raw_text)
print(text[:80])

词元化:

def tokenize_nmt(text, num_examples=None):
    """词元化“英语-法语”数据数据集"""
    source, target = [], []  # 初始化源语言(英语)和目标语言(法语)的词元列表
    # 按行分割文本(每行是一对翻译句),并遍历每一行
    for i, line in enumerate(text.split('\n')):
        # 若指定了最大样本数,且当前行数超过该数量,则停止处理
        if num_examples and i > num_examples:
            break
        # 按制表符\t分割当前行,得到英语部分和法语部分
        parts = line.split('\t')
        # 确保分割后有且仅有两部分(有效翻译对)
        if len(parts) == 2:
            # 英语句子按空格分割为词元(如"go ." → ["go", "."]),加入source列表
            source.append(parts[0].split(' '))
            # 法语句子按空格分割为词元(如"va !" → ["va", "!"]),加入target列表
            target.append(parts[1].split(' '))
    return source, target

source, target = tokenize_nmt(text)
print("英语词元序列前6个样本:", source[:6])
print("法语词元序列前6个样本:", target[:6])

绘制每个文本序列所包含的词元数量的直方图,如图:

def show_list_len_pair_hist(legend, xlabel, ylabel, xlist, ylist):
    """绘制列表长度对的直方图"""
    d2l.set_figsize()  # 设置图表大小(调用d2l库工具函数)
    # 统计并绘制“源语言/目标语言句子的词元数量”的直方图
    _, _, patches = d2l.plt.hist(
        [[len(l) for l in xlist], [len(l) for l in ylist]])
    d2l.plt.xlabel(xlabel)  # 设置x轴标签
    d2l.plt.ylabel(ylabel)  # 设置y轴标签
    # 为“目标语言”的直方图柱形添加斜线纹理(方便视觉区分)
    for patch in patches[1].patches:
        patch.set_hatch('/')
    d2l.plt.legend(legend)  # 设置图例(区分source和target)

show_list_len_pair_hist(['source', 'target'], '# tokens per sequence',
                        'count', source, target);

在这里插入图片描述
词表:

src_vocab = d2l.Vocab(source, min_freq=2,
                      reserved_tokens=['<pad>', '<bos>', '<eos>'])
len(src_vocab)

序列样本都有一个固定的长度,可以通过截断或填充文本序列来实现:

def truncate_pad(line, num_steps, padding_token):
    """截断或填充文本序列"""
    if len(line) > num_steps:
        return line[:num_steps]
    return line + [padding_token] * (num_steps - len(line))

truncate_pad(src_vocab[source[0]], 10, src_vocab['<pad>'])
# [47, 4, 1, 1, 1, 1, 1, 1, 1, 1]

转换成小批量数据集用于训练:

def build_array_nmt(lines, vocab, num_steps):
    """将文本序列转为模型输入的批量数据"""
    # 词元转索引(词汇表映射)
    lines = [vocab[l] for l in lines]
    # 加句尾标记<eos>
    lines = [l + [vocab['<eos>']] for l in lines]
    # 统一长度(截断/填充)并转张量
    array = torch.tensor([truncate_pad(l, num_steps, vocab['<pad>']) for l in lines])
    # 计算有效长度(非<pad>元素数量)
    valid_len = (array != vocab['<pad>']).type(torch.int32).sum(1)
    return array, valid_len
    

训练模型:

def load_data_nmt(batch_size, num_steps, num_examples=600):
    """加载神经机器翻译数据集,返回批量迭代器和词汇表"""
    # 读取原始文本并预处理(规范格式、大小写等)
    text = preprocess_nmt(read_data_nmt())
    
    # 词元化:将文本拆分为源语言(如英语)和目标语言(如法语)的词元序列
    source, target = tokenize_nmt(text, num_examples)
    
    # 构建源语言和目标语言的词汇表(词元→索引映射)
    # 过滤低频词(出现次数≥2),添加填充、句首、句尾标记
    src_vocab = d2l.Vocab(source, min_freq=2,
                          reserved_tokens=['<pad>', '<bos>', '<eos>'])
    tgt_vocab = d2l.Vocab(target, min_freq=2,
                          reserved_tokens=['<pad>', '<bos>', '<eos>'])
    
    # 将词元序列转为张量数组,统一长度并计算有效长度(排除填充部分)
    src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps)
    tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps)
    
    # 打包所有数据数组,生成批量迭代器
    data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len)
    data_iter = d2l.load_array(data_arrays, batch_size)  # 按batch_size划分批量
    
    return data_iter, src_vocab, tgt_vocab

# 读出“英语-法语”数据集中的第一个小批量数据
train_iter, src_vocab, tgt_vocab = load_data_nmt(batch_size=2, num_steps=8)
for X, X_valid_len, Y, Y_valid_len in train_iter:
    print('X:', X.type(torch.int32))
    print('X的有效长度:', X_valid_len)
    print('Y:', Y.type(torch.int32))
    print('Y的有效长度:', Y_valid_len)
    break

编码器—解码器架构

编码器:将输入编程成中间表达形式(特征)
解码器:将中间表示解码成输出
可以通过CNN来理解:
在这里插入图片描述
重新考察RNN
在这里插入图片描述
编码器: 将文本表示成向量
解码器: 向量表示成输出

编码器-解码器架构

  • 一个模型被分为两块:
    • 编码器处理输出
    • 解码器生成输出
from torch import nn

class Encoder(nn.Module):
    """编码器-解码器架构的基本编码器接口"""
    def __init__(self, **kwargs):
        super(Encoder, self).__init__(**kwargs)
    def forward(self, X, *args):
        raise NotImplementedError

class Decoder(nn.Module):
    """编码器-解码器架构的基本解码器接口"""
    def __init__(self, **kwargs):
        super(Decoder, self).__init__(**kwargs)
    def init_state(self, enc_outputs, *args):
        raise NotImplementedError
    def forward(self, X, state):
        raise NotImplementedError

class EncoderDecoder(nn.Module):
    """编码器-解码器架构的基类"""
    def __init__(self, encoder, decoder, **kwargs):
        super(EncoderDecoder, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
    def forward(self, enc_X, dec_X, *args):
        enc_outputs = self.encoder(enc_X, *args)
        dec_state = self.decoder.init_state(enc_outputs, *args)
        return self.decoder(dec_X, dec_state)

在这里插入图片描述
(代码不理解可以配合图片理解)

序列到序列学习(seq2seq)

机器翻译:给定一个源语言的句子,自动翻译成目标语言,这两个句子可以有不同的长度

Seq2seq:

编码器是一个RNN,读取输入句子,可以是双向
解码器使用另外一个RNN来输出

细节:
编码器是没有输出的RNN
编码器最后时间步的隐状态用作解码器的初始隐状态
训练时解码器使用目标句子作为输入
在这里插入图片描述
衡量生成序列好坏的BLEU指标

  1. n-gram 精度 p n p_n pn
    p n p_n pn 表示预测序列中所有 n-gram(n元组)的精度(与参考序列的匹配比例)。

例子
参考序列(标签序列):A B C D E F
预测序列:A B B C D

  • p 1 p_1 p1 = 4/5 :预测的1-gram(ABBCD)中,4个在参考中出现。
  • p 2 p_2 p2 = 3/4 :预测的2-gram(ABBBBCCD)中,3个在参考中出现。
  • p 3 p_3 p3 = 1/3 :预测的3-gram(ABBBBCBCD)中,1个在参考中出现。
  • p 4 p_4 p4 = 0 :预测的4-gram(ABBCBBCD)均不在参考中出现。
  1. BLEU 定义
    BLEU 综合多阶n-gram精度与长度惩罚,公式为:

BLEU = exp ⁡ ( min ⁡ ( 0 , 1 − len label len pred ) ) ⋅ ∏ n = 1 k p n 1 / 2 n \text{BLEU} = \exp\left( \min\left( 0, 1 - \frac{\text{len}_{\text{label}}}{\text{len}_{\text{pred}}} \right) \right) \cdot \prod_{n=1}^{k} p_n^{1/2^n} BLEU=exp(min(0,1lenpredlenlabel))n=1kpn1/2n

其中:

  • len label \text{len}_{\text{label}} lenlabel:参考序列(标签)的长度;
  • len pred \text{len}_{\text{pred}} lenpred:预测序列的长度;
  • min ⁡ ( 0 , 1 − len label len pred ) \min\left( 0, 1 - \frac{\text{len}_{\text{label}}}{\text{len}_{\text{pred}}} \right) min(0,1lenpredlenlabel):长度惩罚项,防止生成序列过短;
  • ∏ n = 1 k p n 1 / 2 n \prod_{n=1}^{k} p_n^{1/2^n} n=1kpn1/2n:各阶n-gram精度的加权几何平均(n越大, 1 / 2 n 1/2^n 1/2n 越小)。

代码实现:

import collections
import math
import torch
from torch import nn
from d2l import torch as d2l

# 实现编码器和解码器
class Seq2SeqEncoder(d2l.Encoder):
    """用于序列到序列学习的循环神经网络编码器"""
    def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
                 dropout=0, **kwargs):
        super(Seq2SeqEncoder, self).__init__(** kwargs)
        # 词嵌入层:将离散的词索引转换为连续的低维向量
        # vocab_size:源语言词汇表大小;embed_size:词向量维度
        self.embedding = nn.Embedding(vocab_size, embed_size)
        # 输入维度=词向量维度(embed_size);隐藏层维度=num_hiddens
        self.rnn = nn.GRU(embed_size, num_hiddens, num_layers,
                          dropout=dropout)

    def forward(self, X, *args):
        # X:输入的源序列词索引,形状为 (batch_size, seq_len)
        # 例如:(32, 10) 表示32个样本,每个样本是长度为10的词索引序列
        # 输出形状:(batch_size, seq_len, embed_size)
        X = self.embedding(X)  # 例:(32, 10) → (32, 10, 128)(假设embed_size=128)
        # 调整维度顺序:GRU要求输入格式为 (seq_len, batch_size, feature)
        X = X.permute(1, 0, 2)  # 例:(32, 10, 128) → (10, 32, 128)
        # output:所有时间步的隐藏状态,形状 (seq_len, batch_size, num_hiddens)
        # state:最后一个时间步的隐藏状态,形状 (num_layers, batch_size, num_hiddens)
        output, state = self.rnn(X)
        return output, state


class Seq2SeqDecoder(d2l.Decoder):
    """用于序列到序列学习的循环神经网络解码器"""
    def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
                 dropout=0, **kwargs):
        super(Seq2SeqDecoder, self).__init__(** kwargs)
        
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.rnn = nn.GRU(embed_size + num_hiddens, num_hiddens, num_layers,
                          dropout=dropout)
        self.dense = nn.Linear(num_hiddens, vocab_size)

    def init_state(self, enc_outputs, *args):
        # 用编码器的输出初始化解码器状态
        # enc_outputs[1]是编码器的最终隐藏状态(形状:num_layers, batch_size, num_hiddens)
        return enc_outputs[1]

    def forward(self, X, state):
        # X:目标序列的前缀词索引,形状 (batch_size, seq_len_dec)
        # 例如:(32, 8) 表示32个样本,每个样本是长度为8的目标前缀
        # 输出形状:(batch_size, seq_len_dec, embed_size)
        X = self.embedding(X).permute(1, 0, 2) 
        # 生成上下文信息:将编码器最后一层的状态复制到每个时间步
        # state[-1]:取编码器最后一层的隐藏状态(形状:batch_size, num_hiddens)
        # repeat(X.shape[0], 1, 1):复制seq_len_dec次,让每个解码步骤都能参考源序列
        # 输出形状:(seq_len_dec, batch_size, num_hiddens)
        context = state[-1].repeat(X.shape[0], 1, 1)  # 例:(32, 256) → (8, 32, 256)(假设num_hiddens=256)
        
        # 拼接目标词向量和上下文:让GRU同时关注“已生成的词”和“源序列语义”
        # 输出形状:(seq_len_dec, batch_size, embed_size + num_hiddens)
        X_and_context = torch.cat((X, context), 2)  # 例:(8, 32, 128+256) = (8, 32, 384)
        output, state = self.rnn(X_and_context, state)
        
        # 先通过全连接层,再调整维度为(batch_size, seq_len_dec, vocab_size)
        output = self.dense(output).permute(1, 0, 2)  # 例:(8, 32, 256) → (32, 8, vocab_size)
        return output, state
def sequence_mask(X, valid_len, value=0):
    """在序列中屏蔽不相关的项"""
    maxlen = X.size(1)
    mask = torch.arange((maxlen), dtype=torch.float32,
                        device=X.device)[None, :] < valid_len[:, None]
    X[~mask] = value
    return X

class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):
    """带遮蔽的softmax交叉熵损失函数"""
    def forward(self, pred, label, valid_len):
        # 创建与label同形状的权重张量(初始全1)
        weights = torch.ones_like(label)  # 形状:(batch_size, seq_len)
        weights = sequence_mask(weights, valid_len)  # 形状不变,无效位置变0
        # 设置交叉熵损失的计算模式:不自动求平均/求和(保留每个位置的损失)
        self.reduction = 'none'  # 父类nn.CrossEntropyLoss的参数
        
        # 计算原始交叉熵损失(未加权)
        # pred形状通常是(batch_size, seq_len, vocab_size),需要转置为(batch_size, vocab_size, seq_len)
        # 因为nn.CrossEntropyLoss要求输入形状为(batch_size, num_classes, seq_len)
        unweighted_loss = super(MaskedSoftmaxCELoss, self).forward(
            pred.permute(0, 2, 1), label)  # 形状:(batch_size, seq_len)
        
        # 用权重屏蔽无效位置的损失(无效位置损失 *= 0),再求每个样本的平均损失
        weighted_loss = (unweighted_loss * weights).mean(dim=1)  # 形状:(batch_size,)
        
        return weighted_loss

训练:

def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
    """训练序列到序列模型"""
	  # 参数初始化
    def xavier_init_weights(m):
        # 对线性层权重进行Xavier初始化
        if type(m) == nn.Linear:
            nn.init.xavier_uniform_(m.weight)
        # 对GRU层的权重进行Xavier初始化
        if type(m) == nn.GRU:
            # 遍历GRU的所有参数,只初始化权重(忽略偏置)
            for param in m._flat_weights_names:
                if "weight" in param:
                    nn.init.xavier_uniform_(m._parameters[param])
    
    net.apply(xavier_init_weights)  
    net.to(device)  
    optimizer = torch.optim.Adam(net.parameters(), lr=lr)  
    loss = MaskedSoftmaxCELoss()  
    net.train() 
    
    animator = d2l.Animator(xlabel='epoch', ylabel='loss',
                     xlim=[10, num_epochs])
    
    for epoch in range(num_epochs):
        timer = d2l.Timer() 
        metric = d2l.Accumulator(2)  
        
        for batch in data_iter:
            optimizer.zero_grad() 
            X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]
            # 构造解码器输入(Teacher Forcing策略)
            # 生成句首标记<bos>:每个样本前添加一个<bos>,形状为(batch_size, 1)
            bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],
                          device=device).reshape(-1, 1)
            # 解码器输入 = <bos> + 目标序列的前n-1个词(因为要预测第n个词)
            # 形状为(batch_size, tgt_seq_len),与目标序列长度一致
            dec_input = torch.cat([bos, Y[:, :-1]], 1)
            
            Y_hat, _ = net(X, dec_input, X_valid_len)
            l = loss(Y_hat, Y, Y_valid_len)  # 形状:(batch_size,),每个样本的平均损失
            l.sum().backward()  # 损失求和后反向传播(计算梯度)
            d2l.grad_clipping(net, 1)  # 梯度裁剪:限制梯度范数≤1,防止梯度爆炸
            num_tokens = Y_valid_len.sum()  # 当前batch的有效token总数(用于计算平均损失)
            optimizer.step()  # 根据梯度更新模型参数
            
            # 累计损失和有效token数(不记录梯度,节省内存)
            with torch.no_grad():
                metric.add(l.sum(), num_tokens)
        
        if (epoch + 1) % 10 == 0:
            animator.add(epoch + 1, (metric[0] / metric[1],))
    
    print(f'最终损失 {metric[0] / metric[1]:.3f}, '
          f'训练速度 {metric[1] / timer.stop():.1f} tokens/sec '
          f'(设备:{str(device)})')


embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1
batch_size, num_steps = 64, 10
lr, num_epochs, device = 0.005, 300, d2l.try_gpu()

train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)
encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers,
                        dropout)
decoder = Seq2SeqDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers,
                        dropout)
net = d2l.EncoderDecoder(encoder, decoder)
train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)

预测:

def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,
                    device, save_attention_weights=False):
    """
    序列到序列(Seq2Seq)模型的推理函数:根据源句子生成目标句子
    参数说明:
    - net: 训练好的Seq2Seq模型(包含编码器和解码器)
    - src_sentence: 输入的源语言句子(如英文句子字符串)
    - src_vocab: 源语言词汇表(用于将源句子转换为token索引)
    - tgt_vocab: 目标语言词汇表(用于将生成的token索引转换为目标词)
    - num_steps: 最大生成长度(防止无限循环生成)
    - device: 运行设备(GPU/CPU)
    - save_attention_weights: 是否保存注意力权重(用于可视化)
    返回值:
    - 生成的目标语言句子(字符串)
    - 注意力权重序列(可选,用于可视化)
    """
    net.eval()  # 确保推理时模型行为稳定,结果可复现
    # 分词→转为小写→转换为token索引→添加句尾标记<eos>
    src_tokens = src_vocab[src_sentence.lower().split(' ')] + [src_vocab['<eos>']]
    # 记录源句子的有效长度(用于编码器屏蔽无效的Padding项)
    enc_valid_len = torch.tensor([len(src_tokens)], device=device)
    src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
    # 转换为张量并增加batch维度(模型要求输入形状为(batch_size, seq_len),此处batch_size=1)
    enc_X = torch.unsqueeze(
        torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0)  # 形状:(1, num_steps)

    # 编码器处理源句子,生成语义编码
    enc_outputs = net.encoder(enc_X, enc_valid_len)  
    # 初始化解码器状态
    dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)

    # 解码器初始输入:句首标记<bos>(目标序列的起始信号)
    # 形状:(1, 1) → (batch_size=1, seq_len=1),每次只输入一个词
    dec_X = torch.unsqueeze(torch.tensor(
        [tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0)

    # 初始化结果存储:生成的目标序列token索引和注意力权重(可选)
    output_seq, attention_weight_seq = [], []
    # 自回归生成目标句子(核心逻辑)
    for _ in range(num_steps):
        # 解码器前向传播:输入当前词(dec_X)和当前状态(dec_state),输出预测和新状态
        # Y: 形状(1, 1, vocab_size) → 当前步对目标词汇表的概率分布
        # dec_state: 更新后的解码器状态(用于下一步生成)
        Y, dec_state = net.decoder(dec_X, dec_state)
        dec_X = Y.argmax(dim=2)  # 取vocab_size维度的最大值索引,形状(1, 1)
        pred = dec_X.squeeze(dim=0).type(torch.int32).item()  # 转换为单个token索引
        # 保存注意力权重(如果需要可视化解码器关注的源句子位置)
        if save_attention_weights:
            attention_weight_seq.append(net.decoder.attention_weights)
        # 如果预测到句尾标记<eos>,停止生成(句子结束)
        if pred == tgt_vocab['<eos>']:
            break
        output_seq.append(pred)

    # 将生成的token索引转换为目标语言句子(字符串)
    return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq

BLEU的代码实现:

def bleu(pred_seq, label_seq, k):  
    """计算BLEU"""
    #  预处理:将生成序列和参考序列分割为词列表
    pred_tokens = pred_seq.split(' ')  
    label_tokens = label_seq.split(' ')  
    len_pred = len(pred_tokens)  
    len_label = len(label_tokens)  

    # 长度惩罚(Length Penalty):惩罚过短的生成序列
    # 公式:exp(min(0, 1 - 参考长度/生成长度))
    score = math.exp(min(0, 1 - len_label / len_pred))

    # 计算n-gram(1到k)的匹配度,累加到分数中
    for n in range(1, k + 1):  # 遍历1-gram, 2-gram, ..., k-gram
        num_matches = 0  # 记录生成序列与参考序列匹配的n-gram数量
        # 用字典统计参考序列中所有n-gram的出现次数(避免重复匹配)
        label_subs = collections.defaultdict(int)
        # 统计参考序列中所有n-gram的出现次数
        # 遍历参考序列中所有可能的n-gram起始位置
        for i in range(len_label - n + 1):
            # 提取从i开始的n个词,组成n-gram字符串(如n=2时,"cat sits")
            label_sub = ' '.join(label_tokens[i: i + n])
            label_subs[label_sub] += 1  # 计数+1

        # 统计生成序列中与参考序列匹配的n-gram数量
        # 遍历生成序列中所有可能的n-gram起始位置
        for i in range(len_pred - n + 1):
            # 提取生成序列的n-gram
            pred_sub = ' '.join(pred_tokens[i: i + n])
            # 若该n-gram在参考序列中存在且未被匹配完,则计数+1,并减少参考中的可用次数
            if label_subs[pred_sub] > 0:
                num_matches += 1
                label_subs[pred_sub] -= 1  # 避免重复匹配同一n-gram

        # 计算当前n-gram的精确度,并加权到总分数中
        # 生成序列中可能的n-gram总数(分母)
        total_pred_subs = len_pred - n + 1 if len_pred >= n else 0
        # 精确度 = 匹配的n-gram数量 / 生成序列的n-gram总数(避免除零)
        precision = num_matches / total_pred_subs if total_pred_subs > 0 else 0
        # 对精确度取权重:n越大,权重越小(0.5^n),但长短语匹配更有价值
        score *= math.pow(precision, math.pow(0.5, n))
    return score

翻译:

engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
    translation, attention_weight_seq = predict_seq2seq(
        net, eng, src_vocab, tgt_vocab, num_steps, device)
    print(f'{eng} => {translation}, bleu {bleu(translation, fra, k=2):.3f}')

效果如下:
在这里插入图片描述

束搜索

贪心搜素

  • 在seq2seq中我们使用了贪心搜索来预测序列
    • 将当前时刻预测概率最大的词输出
    • 但贪心很可能不是最优的

穷举搜索

  • 原理:枚举所有可能的输出序列,计算每个序列的概率 P ( y 1 , y 2 , … , y T ∣ x ) P(y_1, y_2, \dots, y_T \mid x) P(y1,y2,,yTx),选择概率最高的序列。
  • 计算量:若输出字典大小为 n ,序列最大长度为 T ,则需考察的序列总数为 s u m t = 1 T n t ≈ n T sum_{t=1}^T n^t \approx n^T sumt=1TntnT(当 T 较大时)。
    • 例:若 n=10000 , T=10 ,则序列总数约为 1000 0 10 = 1 0 40 10000^{10} = 10^{40} 1000010=1040,计算上完全不可行。

束搜索

  • 保存最好的k个候选

  • 在每个时刻,对每个选新加一项(n种可能),在kn个选项中选出最好的
    在这里插入图片描述

  • 时间复杂度 ( O(knT) )

    • k = 5 , n = 10000 , T = 10 : k n T = 5 × 1 0 5 k = 5, n = 10000, T = 10 : knT = 5 \times 10^5 k=5,n=10000,T=10:knT=5×105
  • 每个候选的最终分数是:
    1 L α log ⁡ p ( y 1 , … , y L ) = 1 L α ∑ t ′ = 1 L log ⁡ p ( y t ′ ∣ y 1 , … , y t ′ − 1 ) \frac{1}{L^\alpha} \log p(y_1, \dots, y_L) = \frac{1}{L^\alpha} \sum_{t'=1}^L \log p(y_{t'} \mid y_1, \dots, y_{t'-1}) Lα1logp(y1,,yL)=Lα1t=1Llogp(yty1,,yt1)

  • 通常 α = 0.75 \alpha = 0.75 α=0.75

  • 束搜索在每次搜索时保存k个最好的候选

    • k=1时是贪心搜索

注意力机制

  • 心理学:

    • 动物需要在复杂环境下有效关注值得注意的点
    • 心理学框架:人类根据随意线索和不随意线索选择注意点
  • 注意力机制

    • 卷积、全连接、池化层都只考虑不随意线索
    • 注意力机制则显示的考虑随意线索
      • 随意线索被称之为查询(query)
      • 每个输入是一个值(value)和不随意线索(key)的对
        • key:“用于匹配的特征”—— 解决 “要不要关注”
        • value:“用于提供信息的特征”—— 解决 “关注后取什么”
      • 通过注意力池化层来有偏向性的选择选择某些输入

非参注意力池化层

  • 给定数据 ( x i , y i x_i, y_i xi,yi), i = 1, … \dots , n
  • 平均池化是最简单的方案:
    f ( x ) = 1 n ∑ i y i f(x) = \frac{1}{n} \sum_{i} y_i f(x)=n1iyi
  • 更好的方案是60年代提出的Nadaraya-Watson核回归:
    f ( x ) = ∑ i = 1 n K ( x − x i ) ∑ j = 1 n K ( x − x j ) y i f(x) = \sum_{i=1}^{n} \frac{K(x - x_i)}{\sum_{j=1}^{n} K(x - x_j)} y_i f(x)=i=1nj=1nK(xxj)K(xxi)yi

使用高斯核: K(u) = 1 2 π exp ⁡ ( − u 2 2 ) \frac{1}{\sqrt{2\pi}} \exp\left(-\frac{u^2}{2}\right) 2π 1exp(2u2)

那么

f ( x ) = ∑ i = 1 n exp ⁡ ( − 1 2 ( x − x i ) 2 ) ∑ j = 1 n exp ⁡ ( − 1 2 ( x − x j ) 2 ) y i = ∑ i = 1 n softmax ( − 1 2 ( x − x i ) 2 ) y i \begin{aligned} f(x) &= \sum_{i=1}^{n} \frac{\exp\left(-\frac{1}{2}(x - x_i)^2\right)}{\sum_{j=1}^{n} \exp\left(-\frac{1}{2}(x - x_j)^2\right)} y_i \\ &= \sum_{i=1}^{n} \text{softmax}\left(-\frac{1}{2}(x - x_i)^2\right) y_i \end{aligned} f(x)=i=1nj=1nexp(21(xxj)2)exp(21(xxi)2)yi=i=1nsoftmax(21(xxi)2)yi

注意力汇聚:Nadaraya-Watson 核回归 代码实现:

import torch
from torch import nn
from d2l import torch as d2l

n_train = 50
x_train, _ = torch.sort(torch.rand(n_train) * 5)

def f(x):
    return 2 * torch.sin(x) + x**0.8

y_train = f(x_train) + torch.normal(0.0, 0.5, (n_train,))
x_test = torch.arange(0, 5, 0.1)
y_truth = f(x_test)
n_test = len(x_test)

def plot_kernel_reg(y_hat):
    d2l.plot(x_test, [y_truth, y_hat], 'x', 'y', legend=['Truth', 'Pred'],
             xlim=[0, 5], ylim=[-1, 5])
    d2l.plt.plot(x_train, y_train, 'o', alpha=0.5);

# 生成 “平均池化” 的预测结果 y_hat
y_hat = torch.repeat_interleave(y_train.mean(), n_test)
plot_kernel_reg(y_hat)

平均池化
非参数注意力汇聚:

# reshape((-1, n_train)):重塑为形状为(n_test, n_train)的矩阵(50×50)
# 矩阵中每行对应一个测试样本,每行的n_train个元素都是该测试样本的值(如第1行全为0.0,第2行全为0.1)
X_repeat = x_test.repeat_interleave(n_train).reshape((-1, n_train))
attention_weights = nn.functional.softmax(-(X_repeat - x_train)** 2 / 2, dim=1)
y_hat = torch.matmul(attention_weights, y_train)
plot_kernel_reg(y_hat)

d2l.show_heatmaps(attention_weights.unsqueeze(0).unsqueeze(0),
                  xlabel='Sorted training inputs',
                  ylabel='Sorted testing inputs')

在这里插入图片描述
带参数注意力汇聚 :
这里先引入批量矩阵乘法:假定两个张量的形状分别是 ( n , a , b ) (n,a,b) (n,a,b) ( n , b , c ) (n,b,c) (n,b,c)
它们的批量矩阵乘法输出的形状为 ( n , a , c ) (n,a,c) (n,a,c)

X = torch.ones((2, 1, 4))
Y = torch.ones((2, 4, 6))
torch.bmm(X, Y).shape
# torch.Size([2, 1, 6])

# 使用小批量矩阵乘法来计算小批量数据中的加权平均值
weights_reshaped = weights.unsqueeze(1)  # 形状:(2, 1, 10)
values_reshaped = values.unsqueeze(-1)  # 形状:(2, 10, 1)
torch.bmm(weights.unsqueeze(1), values.unsqueeze(-1))
# tensor([[[ 4.5000]],
#        [[14.5000]]])

接下来是带参数注意力汇聚实现:

class NWKernelRegression(nn.Module):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        # 通过学习一个w来控制曲线平滑一点还是不平滑一点
        self.w = nn.Parameter(torch.rand((1,), requires_grad=True))

    def forward(self, queries, keys, values):
        queries = queries.repeat_interleave(keys.shape[1]).reshape((-1, keys.shape[1]))
        self.attention_weights = nn.functional.softmax(
            -((queries - keys) * self.w)**2 / 2, dim=1)
        return torch.bmm(self.attention_weights.unsqueeze(1),
                         values.unsqueeze(-1)).reshape(-1)

X_tile = x_train.repeat((n_train, 1))
Y_tile = y_train.repeat((n_train, 1))
# keys: 从X_tile中筛选非对角线元素,重塑为 (n_train, n_train-1)
# values: 从Y_tile中筛选非对角线元素,重塑为 (n_train, n_train-1
# 筛掉对角线的核心目的是防止模型 “作弊”,强制它学习 “不同样本之间的真实关联”,而不是简单记住 “自己等于自己”,从而避免过拟合
keys = X_tile[(1 - torch.eye(n_train)).type(torch.bool)].reshape((n_train, -1))
values = Y_tile[(1 - torch.eye(n_train)).type(torch.bool)].reshape((n_train, -1))

net = NWKernelRegression()
loss = nn.MSELoss(reduction='none')
trainer = torch.optim.SGD(net.parameters(), lr=0.5)
animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[1, 5])

for epoch in range(5):
    trainer.zero_grad()
    l = loss(net(x_train, keys, values), y_train)
    l.sum().backward()
    trainer.step()
    print(f'epoch {epoch + 1}, loss {float(l.sum()):.6f}')
    animator.add(epoch + 1, float(l.sum()))

keys = x_train.repeat((n_test, 1))
values = y_train.repeat((n_test, 1))
y_hat = net(x_test, keys, values).unsqueeze(1).detach()
plot_kernel_reg(y_hat)

d2l.show_heatmaps(net.attention_weights.unsqueeze(0).unsqueeze(0),
                  xlabel='Sorted training inputs',
                  ylabel='Sorted testing inputs')

在这里插入图片描述

注意力分数

怎样把 key 和 value 扩展到高维度:

  • 假设query q ∈ R q \mathbf{q} \in \mathbb{R}^q qRq m m m 对 key-value ( k 1 , v 1 ) , … , (\mathbf{k}_1, \mathbf{v}_1), \dots, (k1,v1),,,这里 k i ∈ R k , v i ∈ R v \mathbf{k}_i \in \mathbb{R}^k, \mathbf{v}_i \in \mathbb{R}^v kiRk,viRv

  • 注意力池化层:
    f ( q , ( k 1 , v 1 ) , … , ( k m , v m ) ) = ∑ i = 1 m α ( q , k i ) v i ∈ R v f(\mathbf{q}, (\mathbf{k}_1, \mathbf{v}_1), \dots, (\mathbf{k}_m, \mathbf{v}_m)) = \sum_{i=1}^m \alpha(\mathbf{q}, \mathbf{k}_i)\mathbf{v}_i \in \mathbb{R}^v f(q,(k1,v1),,(km,vm))=i=1mα(q,ki)viRv
    α ( q , k i ) = softmax ( a ( q , k i ) ) = exp ⁡ ( a ( q , k i ) ) ∑ j = 1 m exp ⁡ ( a ( q , k j ) ) ∈ R \alpha(\mathbf{q}, \mathbf{k}_i) = \text{softmax}(a(\mathbf{q}, \mathbf{k}_i)) = \frac{\exp(a(\mathbf{q}, \mathbf{k}_i))}{\sum_{j=1}^m \exp(a(\mathbf{q}, \mathbf{k}_j))} \in \mathbb{R} α(q,ki)=softmax(a(q,ki))=j=1mexp(a(q,kj))exp(a(q,ki))R

Additive Attention:

  • 可学参数: W k ∈ R h × k , W q ∈ R h × q , v ∈ R h \mathbf{W}_k \in \mathbb{R}^{h \times k}, \mathbf{W}_q \in \mathbb{R}^{h \times q}, \mathbf{v} \in \mathbb{R}^h WkRh×k,WqRh×q,vRh

a ( k , q ) = v T tanh ⁡ ( W k k + W q q ) a(\mathbf{k}, \mathbf{q}) = \mathbf{v}^T \tanh(\mathbf{W}_k \mathbf{k} + \mathbf{W}_q \mathbf{q}) a(k,q)=vTtanh(Wkk+Wqq)

Scaled Dot-Product Attention:

  • 如果query和key都是同样的长度 q , k i ∈ R d \mathbf{q}, \mathbf{k}_i \in \mathbb{R}^d q,kiRd,那么可以
    a ( q , k i ) = ⟨ q , k i ⟩ / d a(\mathbf{q}, \mathbf{k}_i) = \langle \mathbf{q}, \mathbf{k}_i \rangle / \sqrt{d} a(q,ki)=q,ki/d

  • 向量化版本

    • Q ∈ R n × d \mathbf{Q} \in \mathbb{R}^{n \times d} QRn×d K ∈ R m × d , V ∈ R m × v \mathbf{K} \in \mathbb{R}^{m \times d}, \mathbf{V} \in \mathbb{R}^{m \times v} KRm×d,VRm×v
    • 注意力分数: a ( Q , K ) = Q K T / d ∈ R n × m a(\mathbf{Q}, \mathbf{K}) = \mathbf{Q}\mathbf{K}^T / \sqrt{d} \in \mathbb{R}^{n \times m} a(Q,K)=QKT/d Rn×m
    • 注意力池化: f = softmax ( a ( Q , K ) ) V ∈ R n × v f = \text{softmax}\left(a(\mathbf{Q}, \mathbf{K})\right) \mathbf{V} \in \mathbb{R}^{n \times v} f=softmax(a(Q,K))VRn×v

注意力打分函数代码实现:

import math
import torch
from torch import nn
from d2l import torch as d2l

def masked_softmax(X, valid_lens):
    """通过在最后一个轴上掩蔽元素来执行softmax操作"""
    if valid_lens is None:
        return nn.functional.softmax(X, dim=-1)
    else:
        shape = X.shape
        if valid_lens.dim() == 1:
            # 若valid_lens是1维张量(如(batch_size,)),则按X的第1维长度重复
            # 例如:X形状为(batch_size, seq_len, dim),则重复seq_len次,使valid_lens长度为batch_size*seq_len
            valid_lens = torch.repeat_interleave(valid_lens, shape[1])
        else:
            # 若valid_lens是多维张量,展平为1维(总长度需与X中"样本数"匹配)
            valid_lens = valid_lens.reshape(-1)
        X = d2l.sequence_mask(
            X.reshape(-1, shape[-1]),  # 展平为二维:(N, shape[-1]),N=批量大小*其他维度乘积
            valid_lens,                # 1维有效长度列表,每个元素对应一个样本
            value=-1e6                 # 掩蔽值设为-1e6(适合softmax场景)
        )
        
        return nn.functional.softmax(X.reshape(shape), dim=-1)
class AdditiveAttention(nn.Module):
    """加性注意力"""
    def __init__(self, key_size, query_size, num_hiddens, dropout, **kwargs):
        super(AdditiveAttention, self).__init__(**kwargs)
        self.W_k = nn.Linear(key_size, num_hiddens, bias=False)
        self.W_q = nn.Linear(query_size, num_hiddens, bias=False)
        self.w_v = nn.Linear(num_hiddens, 1, bias=False)
        self.dropout = nn.Dropout(dropout)

    def forward(self, queries, keys, values, valid_lens):
        queries, keys = self.W_q(queries), self.W_k(keys)
        features = queries.unsqueeze(2) + keys.unsqueeze(1)
        features = torch.tanh(features)
        scores = self.w_v(features).squeeze(-1)
        self.attention_weights = masked_softmax(scores, valid_lens)
        return torch.bmm(self.dropout(self.attention_weights), values)
class DotProductAttention(nn.Module):
    """缩放点积注意力"""
    def __init__(self, dropout, **kwargs):
        super(DotProductAttention, self).__init__(**kwargs)
        self.dropout = nn.Dropout(dropout)

    def forward(self, queries, keys, values, valid_lens=None):
        d = queries.shape[-1]
        scores = torch.bmm(queries, keys.transpose(1,2)) / math.sqrt(d)
        self.attention_weights = masked_softmax(scores, valid_lens)
        return torch.bmm(self.dropout(self.attention_weights), values)

使用注意力机制的seq2seq

动机:

  • 机器翻译中,每个生成的词可能相关于源句子中不同的词
  • seq2seq模型中不能对此直接建模
    加入注意力
  • 编码器对每次词的输出作为 key 和 value(他们是相同的)
  • 解码器 RNN 对上一个词的输出是 query
  • 注意力的输出和下一个词的词嵌入合并进入解码器 RNN

代码实现(Bahdanau 注意力):

import torch
from torch import nn
from d2l import torch as d2l

class AttentionDecoder(d2l.Decoder):
    """带有注意力机制解码器的基本接口"""
    def __init__(self, **kwargs):
        super(AttentionDecoder, self).__init__(**kwargs)

    @property
    def attention_weights(self):
        raise NotImplementedError
class Seq2SeqAttentionDecoder(AttentionDecoder):
    # 初始化带注意力机制的Seq2Seq解码器
    def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
                 dropout=0, **kwargs):
        super(Seq2SeqAttentionDecoder, self).__init__(** kwargs)
        self.attention = d2l.AdditiveAttention(  # 加性注意力机制
            num_hiddens, num_hiddens, num_hiddens, dropout)
        self.embedding = nn.Embedding(vocab_size, embed_size)  # 词嵌入层
        self.rnn = nn.GRU(
            embed_size + num_hiddens, num_hiddens, num_layers,
            dropout=dropout)
        self.dense = nn.Linear(num_hiddens, vocab_size)  

    # 初始化解码器状态(使用编码器输出)
    def init_state(self, enc_outputs, enc_valid_lens, *args):
        outputs, hidden_state = enc_outputs  # 编码器输出和隐藏状态
        # 调整维度并返回状态(编码器输出、隐藏状态、有效长度)
        return (outputs.permute(1, 0, 2), hidden_state, enc_valid_lens)

    # 前向传播:输入X和状态,输出预测和新状态
    def forward(self, X, state):
        enc_outputs, hidden_state, enc_valid_lens = state  # 解码器初始状态
        X = self.embedding(X).permute(1, 0, 2)  # 词嵌入并调整维度(时间步, 批量, 嵌入维度)
        outputs, self._attention_weights = [], []  # 存储输出和注意力权重
        
        # 逐时间步处理输入
        for x in X:
            # 用GRU最后一层的隐藏状态作为查询,新增的维度(第 1 维)代表 “查询的数量”
            query = torch.unsqueeze(hidden_state[-1], dim=1)
            # 调用加性注意力:query(解码器隐藏态)与enc_outputs(编码器输出,作为key和value)计算关联
            context = self.attention(
                query, enc_outputs, enc_outputs, enc_valid_lens)# context形状:(batch_size, 1, num_hiddens)(每个查询对应的注意力加权结果)
            x = torch.cat((context, torch.unsqueeze(x, dim=1)), dim=-1)
            # (GRU要求输入形状为(seq_len, batch_size, input_size),这里seq_len=1)
            out, hidden_state = self.rnn(x.permute(1, 0, 2), hidden_state)
            outputs.append(out)  # 收集输出
            self._attention_weights.append(self.attention.attention_weights)  # 收集注意力权重
        
        # 拼接所有时间步输出,通过全连接层预测词汇
        outputs = self.dense(torch.cat(outputs, dim=0))
        # 调整输出维度并返回(批量, 时间步, 词汇表大小),以及新状态
        return outputs.permute(1, 0, 2), [enc_outputs, hidden_state, enc_valid_lens]

    # 提供注意力权重的访问接口(实现基类要求)
    @property
    def attention_weights(self):
        return self._attention_weights

训练:

embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1
batch_size, num_steps = 64, 10
lr, num_epochs, device = 0.005, 250, d2l.try_gpu()

train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)
encoder = d2l.Seq2SeqEncoder(
    len(src_vocab), embed_size, num_hiddens, num_layers, dropout)
decoder = Seq2SeqAttentionDecoder(
    len(tgt_vocab), embed_size, num_hiddens, num_layers, dropout)
net = d2l.EncoderDecoder(encoder, decoder)
d2l.train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)

engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
    translation, dec_attention_weight_seq = d2l.predict_seq2seq(
        net, eng, src_vocab, tgt_vocab, num_steps, device, True)
    print(f'{eng} => {translation}, ',
          f'bleu {d2l.bleu(translation, fra, k=2):.3f}')

自注意力

给定序列  x 1 , … , x n , ∀ x i ∈ R d 自注意力池化层将  x i  当做key,value,query来对序列抽取特征得到  y 1 , … , y n , 这里 y i = f ( x i , ( x 1 , x 1 ) , … , ( x n , x n ) ) ∈ R d \begin{aligned} &\text{给定序列 } \mathbf{x}_1, \dots, \mathbf{x}_n, \forall \mathbf{x}_i \in \mathbb{R}^d \\ &\text{自注意力池化层将 } \mathbf{x}_i \text{ 当做key,value,query来对序列抽取特征得到 } \mathbf{y}_1, \dots, \mathbf{y}_n,\text{ 这里} \\ &\mathbf{y}_i = f(\mathbf{x}_i, (\mathbf{x}_1, \mathbf{x}_1), \dots, (\mathbf{x}_n, \mathbf{x}_n)) \in \mathbb{R}^d \end{aligned} 给定序列 x1,,xn,xiRd自注意力池化层将 xi 当做keyvaluequery来对序列抽取特征得到 y1,,yn 这里yi=f(xi,(x1,x1),,(xn,xn))Rd

CNN RNN 自注意力
计算复杂度 O ( k n d 2 ) O(knd^2) O(knd2) O ( n d 2 ) O(nd^2) O(nd2) O ( n 2 d ) O(n^2d) O(n2d)
并行度 O ( n ) O(n) O(n) O ( 1 ) O(1) O(1) O ( n ) O(n) O(n)
最长路径 O ( n / k ) O(n/k) O(n/k) O ( n ) O(n) O(n) O ( 1 ) O(1) O(1)

位置编码

  • 跟CNN/RNN不同,自注意力并没有记录位置信息
  • 位置编码将位置信息注入到输入里
    • 假设长度为 n n n 的序列是 X ∈ R n × d \mathbf{X} \in \mathbb{R}^{n \times d} XRn×d,那么使用位置编码矩阵 P ∈ R n × d \mathbf{P} \in \mathbb{R}^{n \times d} PRn×d 来输出 X + P \mathbf{X} + \mathbf{P} X+P 作为自编码输入
  • P \mathbf{P} P 的元素如下计算:
    p i , 2 j = sin ⁡ ( i 1000 0 2 j / d ) , p i , 2 j + 1 = cos ⁡ ( i 1000 0 2 j / d ) p_{i,2j} = \sin\left( \frac{i}{10000^{2j/d}} \right), \quad p_{i,2j+1} = \cos\left( \frac{i}{10000^{2j/d}} \right) pi,2j=sin(100002j/di),pi,2j+1=cos(100002j/di)

相对位置信息

  • 位置于 i + δ i+\delta i+δ 处的位置编码可以通过线性投影位置 i 处的位置编码来表示
  • ω j = 1 / 1000 0 2 j / d \omega_j = 1/10000^{2j/d} ωj=1/100002j/d,那么

[ cos ⁡ ( δ ω j ) sin ⁡ ( δ ω j ) − sin ⁡ ( δ ω j ) cos ⁡ ( δ ω j ) ] [ p i , 2 j p i , 2 j + 1 ] [ p i + δ , 2 j p i + δ , 2 j + 1 ] \begin{bmatrix} \cos(\delta \omega_j) & \sin(\delta \omega_j) \\ -\sin(\delta \omega_j) & \cos(\delta \omega_j) \end{bmatrix} \begin{bmatrix} p_{i,2j} \\ p_{i,2j+1} \end{bmatrix} \begin{bmatrix} p_{i+\delta,2j} \\ p_{i+\delta,2j+1} \end{bmatrix} [cos(δωj)sin(δωj)sin(δωj)cos(δωj)][pi,2jpi,2j+1][pi+δ,2jpi+δ,2j+1]

投影矩阵跟 i 无关

代码实现:

import math
import torch
from torch import nn
from d2l import torch as d2l

# 自注意力
num_hiddens, num_heads = 100, 5
attention = d2l.MultiHeadAttention(num_hiddens, num_hiddens, num_hiddens,
                                   num_hiddens, num_heads, 0.5)
attention.eval()

batch_size, num_queries, valid_lens = 2, 4, torch.tensor([3, 2])
X = torch.ones((batch_size, num_queries, num_hiddens))
attention(X, X, X, valid_lens).shape
class PositionalEncoding(nn.Module):
    """位置编码"""
    def __init__(self, num_hiddens, dropout, max_len=1000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(dropout)
        self.P = torch.zeros((1, max_len, num_hiddens))
        X = torch.arange(max_len, dtype=torch.float32).reshape(
            -1, 1) / torch.pow(10000, torch.arange(
            0, num_hiddens, 2, dtype=torch.float32) / num_hiddens)
        self.P[:, :, 0::2] = torch.sin(X)
        self.P[:, :, 1::2] = torch.cos(X)

    def forward(self, X):
        X = X + self.P[:, :X.shape[1], :].to(X.device)
        return self.dropout(X)

Transformer

Transformer架构
基于编码器-解码器架构来处理序列对
跟使用注意力的seq2seq不同,Transformer是纯基于注意力
在这里插入图片描述
多头注意力

  • 对同一key,value,query,希望抽取不同的信息
    • 例如短距离关系和长距离关系
  • 多头注意力使用h个独立的注意力池化
    • 合并各个头(head)输出得到最终输出
      在这里插入图片描述

多头注意力

  • query q ∈ R d q \mathbf{q} \in \mathbb{R}^{d_q} qRdq,key k ∈ R d k \mathbf{k} \in \mathbb{R}^{d_k} kRdk,value v ∈ R d v \mathbf{v} \in \mathbb{R}^{d_v} vRdv
  • i i i 的可学习参数: W i ( q ) ∈ R p q × d q \mathbf{W}_i^{(q)} \in \mathbb{R}^{p_q \times d_q} Wi(q)Rpq×dq W i ( k ) ∈ R p k × d k \mathbf{W}_i^{(k)} \in \mathbb{R}^{p_k \times d_k} Wi(k)Rpk×dk W i ( v ) ∈ R p v × d v \mathbf{W}_i^{(v)} \in \mathbb{R}^{p_v \times d_v} Wi(v)Rpv×dv
  • i i i 的输出: h i = f ( W i ( q ) q , W i ( k ) k , W i ( v ) v ) ∈ R p v \mathbf{h}_i = f(\mathbf{W}_i^{(q)}\mathbf{q}, \mathbf{W}_i^{(k)}\mathbf{k}, \mathbf{W}_i^{(v)}\mathbf{v}) \in \mathbb{R}^{p_v} hi=f(Wi(q)q,Wi(k)k,Wi(v)v)Rpv
  • 输出的可学习参数: W o ∈ R p o × h p v \mathbf{W}_o \in \mathbb{R}^{p_o \times h p_v} WoRpo×hpv
  • 多头注意力的输出: h = W o [ h 1 h 2 ⋮ h h ] ∈ R p o \mathbf{h} = \mathbf{W}_o \begin{bmatrix} \mathbf{h}_1 \\ \mathbf{h}_2 \\ \vdots \\ \mathbf{h}_h \end{bmatrix} \in \mathbb{R}^{p_o} h=Wo h1h2hh Rpo

有掩码的多头注意力

  • 解码器对序列中一个元素输出时,不应该考虑该元素之后的元素
  • 可以通过掩码来实现
    • 也就是计算 x i x_i xi 输出时,假装当前序列长度为 i

基于位置的前馈网络

  • 将输入形状由(b,n,d)变换成(bn,d)
  • 作用两个全连接层
  • 输出形状由(bn,d)变化回(b,n,d)
  • 等价于两层核窗口为1的一维卷积层

层归一化

  • 批量归一化对每个特征/通道里元素进行归一化

    • 不适合序列长度会变的NLP应用层归一化
  • 对每个样本里的元素进行归一化
    在这里插入图片描述

  • 编码器中的输出 y 1 , … , y n \mathbf{y}_1, \dots, \mathbf{y}_n y1,,yn

  • 将其作为解码中第 ( i ) 个Transformer块中多头注意力的key和value

    • 它的query来自目标序列
  • 意味着编码器和解码器中块的输出维度是一样的

预测

  • 预测第t+1个输出时
  • 解码器中输入前t个预测值
    • 在自注意力中,前t个预测值作为key和value,第t个预测值还作为query

代码实现:
多头注意力:

import math
import torch
from torch import nn
from d2l import torch as d2l

class MultiHeadAttention(nn.Module):
    """多头注意力机制:将注意力拆分为多个头并行计算,捕捉不同子空间的特征关联"""
    def __init__(self, key_size, query_size, value_size, num_hiddens, num_heads, dropout, bias=False, **kwargs):
        super(MultiHeadAttention, self).__init__(**kwargs)
        self.num_heads = num_heads  # 注意力头的数量
        self.attention = d2l.DotProductAttention(dropout)  # 单头点积注意力模块
        
        # 线性投影层:将query/key/value映射到指定隐藏维度
        self.W_q = nn.Linear(query_size, num_hiddens, bias=bias)
        self.W_k = nn.Linear(key_size, num_hiddens, bias=bias)
        self.W_v = nn.Linear(value_size, num_hiddens, bias=bias)
        
        # 输出投影层:合并多头结果后再次线性变换
        self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias)

    def forward(self, queries, keys, values, valid_lens):
        # 1. 线性投影 + 维度变换(适配多头并行计算)
        queries = transpose_qkv(self.W_q(queries), self.num_heads)
        keys = transpose_qkv(self.W_k(keys), self.num_heads)
        values = transpose_qkv(self.W_v(values), self.num_heads)
        
        # 2. 处理有效长度:为每个头重复有效长度(每个头独立进行掩码)
        if valid_lens is not None:
            valid_lens = torch.repeat_interleave(
                valid_lens, repeats=self.num_heads, dim=0)
        
        # 3. 多头注意力核心计算(多个头并行执行注意力过程)
        output = self.attention(queries, keys, values, valid_lens)
        
        # 4. 还原维度 + 输出投影(合并多头结果并做最终线性变换)
        output_concat = transpose_output(output, self.num_heads)
        return self.W_o(output_concat)

# 辅助函数:调整Q/K/V的维度以支持多头并行计算
def transpose_qkv(X, num_heads):
    """将输入形状从 (batch_size, seq_len, num_hiddens) 转换为
    (batch_size * num_heads, seq_len, num_hiddens // num_heads),
    让多个注意力头能并行计算"""
    X = X.reshape(X.shape[0], X.shape[1], num_heads, -1)   # 拆分隐藏层为“头数 × 单头隐藏维度”
    X = X.permute(0, 2, 1, 3)                              # 调整维度顺序为 (batch, heads, seq, hidden_per_head)
    return X.reshape(-1, X.shape[2], X.shape[3])           # 合并batch和heads维度,得到并行计算的形状,实现用一次矩阵运算完成所有头的注意力计算

# 辅助函数:逆转 transpose_qkv 的维度变换,合并多头结果
def transpose_output(X, num_heads):
    """将输入形状从 (batch_size * num_heads, seq_len, num_hiddens // num_heads)
    转换回 (batch_size, seq_len, num_hiddens),合并多个头的结果"""
    X = X.reshape(-1, num_heads, X.shape[1], X.shape[2])   # 拆分batch和heads维度
    X = X.permute(0, 2, 1, 3)                              # 调整维度顺序为 (batch, seq, heads, hidden_per_head)
    return X.reshape(X.shape[0], X.shape[1], -1)           # 合并heads和单头隐藏维度,还原总隐藏层维度

Transformer代码实现:

import math
import pandas as pd
import torch
from torch import nn
from d2l import torch as d2l

class PositionWiseFFN(nn.Module):
    """基于位置的前馈网络"""
    def __init__(self, ffn_num_input, ffn_num_hiddens, ffn_num_outputs,
                 **kwargs):
        super(PositionWiseFFN, self).__init__(**kwargs)
        self.dense1 = nn.Linear(ffn_num_input, ffn_num_hiddens)
        self.relu = nn.ReLU()
        self.dense2 = nn.Linear(ffn_num_hiddens, ffn_num_outputs)

    def forward(self, X):
        return self.dense2(self.relu(self.dense1(X)))

class AddNorm(nn.Module):
    """残差连接后进行层规范化"""
    def __init__(self, normalized_shape, dropout, **kwargs):
        super(AddNorm, self).__init__(**kwargs)
        self.dropout = nn.Dropout(dropout)
        self.ln = nn.LayerNorm(normalized_shape)

    def forward(self, X, Y):
        return self.ln(self.dropout(Y) + X)
class EncoderBlock(nn.Module):
    """Transformer编码器块"""
    def __init__(self, key_size, query_size, value_size, num_hiddens,
                 norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
                 dropout, use_bias=False, **kwargs):
        super(EncoderBlock, self).__init__(**kwargs)
        self.attention = d2l.MultiHeadAttention(
            key_size, query_size, value_size, num_hiddens, num_heads, dropout,
            use_bias)
        self.addnorm1 = AddNorm(norm_shape, dropout)
        self.ffn = PositionWiseFFN(
            ffn_num_input, ffn_num_hiddens, num_hiddens)
        self.addnorm2 = AddNorm(norm_shape, dropout)

    def forward(self, X, valid_lens):
        Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))
        return self.addnorm2(Y, self.ffn(Y))

class TransformerEncoder(d2l.Encoder):
    """Transformer编码器"""
    def __init__(self, vocab_size, key_size, query_size, value_size,
                 num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
                 num_heads, num_layers, dropout, use_bias=False, **kwargs):
        super(TransformerEncoder, self).__init__(** kwargs)
        # 核心参数与组件初始化
        self.num_hiddens = num_hiddens  # 隐藏层维度(如512,贯穿整个编码器的核心维度)
        self.embedding = nn.Embedding(vocab_size, num_hiddens)  # 词嵌入层
        self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout)  # 位置编码层
        self.blks = nn.Sequential()  # 堆叠多个编码器块
        
        # 循环添加num_layers个编码器块(EncoderBlock)
        for i in range(num_layers):
            self.blks.add_module("block"+str(i),
                EncoderBlock(
                    key_size=key_size,  # 键(key)的特征维度
                    query_size=query_size,  # 查询(query)的特征维度
                    value_size=value_size,  # 值(value)的特征维度
                    num_hiddens=num_hiddens,  # 隐藏层维度(与词嵌入维度一致)
                    norm_shape=norm_shape,  # 层归一化的目标维度
                    ffn_num_input=ffn_num_input,  # 前馈网络的输入维度
                    ffn_num_hiddens=ffn_num_hiddens,  # 前馈网络的隐藏维度
                    num_heads=num_heads,  # 多头注意力的头数
                    dropout=dropout,  #  dropout概率
                    use_bias=use_bias  # 线性层是否使用偏置
                )
            )

def forward(self, X, valid_lens, *args):
    X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
    
    self.attention_weights = [None] * len(self.blks)
    for i, blk in enumerate(self.blks):
        X = blk(X, valid_lens)  # 每个块接收上一层的输出,输出更精炼的特征
        # 记录当前层的注意力权重(用于后续可视化或分析)
        self.attention_weights[i] = blk.attention.attention.attention_weights
    
    return X  # 返回最终的编码特征(形状:(batch_size, seq_len, num_hiddens))
class DecoderBlock(nn.Module):
    """解码器中第i个块"""
    def __init__(self, key_size, query_size, value_size, num_hiddens,
                 norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
                 dropout, i, **kwargs):
        super(DecoderBlock, self).__init__(** kwargs)
        self.i = i  # 当前解码器块的索引(用于定位历史状态)
        # 第一个多头注意力:解码器自注意力(带掩码,防止关注未来位置)
        self.attention1 = d2l.MultiHeadAttention(
            key_size, query_size, value_size, num_hiddens, num_heads, dropout)
        self.addnorm1 = AddNorm(norm_shape, dropout)  # 残差连接+层归一化
        
        # 第二个多头注意力:编码器-解码器注意力(关联源序列和目标序列)
        self.attention2 = d2l.MultiHeadAttention(
            key_size, query_size, value_size, num_hiddens, num_heads, dropout)
        self.addnorm2 = AddNorm(norm_shape, dropout)  # 残差连接+层归一化
        
        self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens, num_hiddens)
        self.addnorm3 = AddNorm(norm_shape, dropout)  # 残差连接+层归一化

	def forward(self, X, state):
    	# 从state中提取编码器输出、编码器有效长度、历史状态
    	enc_outputs, enc_valid_lens = state[0], state[1]
    	# state[2] 存储每个解码器块的历史状态(用于自回归生成)
    	if state[2][self.i] is None:
        	key_values = X  # 初始状态:key/value为当前输入X
    	else:
        	# 非初始状态:拼接历史状态和当前输入(保留之前生成的词)
        	key_values = torch.cat((state[2][self.i], X), axis=1)
    	# 更新当前块的历史状态(用于下一次生成)
    	state[2][self.i] = key_values
    
    	# 训练时:生成解码器的有效长度(用于掩码未来位置)
    	if self.training:
        	batch_size, num_steps, _ = X.shape
        	# dec_valid_lens形状:(batch_size, num_steps),每行是[1,2,...,num_steps]
        	# 作用:确保每个位置只能关注自身及之前的位置(屏蔽未来)
        	dec_valid_lens = torch.arange(1, num_steps + 1, device=X.device).repeat(batch_size, 1)
    	else:
        	dec_valid_lens = None  # 推理时通过其他方式控制掩码
    
    	# 解码器自注意力(带掩码)
    	X2 = self.attention1(X, key_values, key_values, dec_valid_lens)
    	Y = self.addnorm1(X, X2)  # 残差连接+层归一化
    
    	# 编码器-解码器注意力(关联源序列)
    	Y2 = self.attention2(Y, enc_outputs, enc_outputs, enc_valid_lens)
    	Z = self.addnorm2(Y, Y2)  # 残差连接+层归一化
    
    	# 步骤3:前馈网络+最终残差连接
    	return self.addnorm3(Z, self.ffn(Z)), state

class TransformerDecoder(d2l.AttentionDecoder):
    """Transformer解码器:基于编码器输出和已生成序列,生成目标序列"""
    def __init__(self, vocab_size, key_size, query_size, value_size,
                 num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
                 num_heads, num_layers, dropout, **kwargs):
        # 调用父类构造函数
        super(TransformerDecoder, self).__init__(** kwargs)
        self.num_hiddens = num_hiddens  
        self.num_layers = num_layers    

        self.embedding = nn.Embedding(vocab_size, num_hiddens)
        
        self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout)     
        self.blks = nn.Sequential()
        for i in range(num_layers):
            # 每个解码器块包含:带掩码的自注意力、编码器-解码器交叉注意力、前馈网络
            self.blks.add_module("block"+str(i),
                DecoderBlock(
                    key_size=key_size,          # 注意力中key的维度
                    query_size=query_size,      # 注意力中query的维度
                    value_size=value_size,      # 注意力中value的维度
                    num_hiddens=num_hiddens,    # 隐藏层维度
                    norm_shape=norm_shape,      # 层归一化的目标维度
                    ffn_num_input=ffn_num_input, # 前馈网络的输入维度
                    ffn_num_hiddens=ffn_num_hiddens, # 前馈网络的隐藏维度
                    num_heads=num_heads,        # 多头注意力的头数
                    dropout=dropout,            # dropout概率
                    i=i                         # 当前块的索引(用于定位历史状态)
                )
            )
        self.dense = nn.Linear(num_hiddens, vocab_size)

    def init_state(self, enc_outputs, enc_valid_lens, *args):
        """初始化解码器状态(供生成过程使用)"""
        return [enc_outputs, enc_valid_lens, [None] * self.num_layers]

    def forward(self, X, state):
        """前向传播:处理目标序列,生成预测结果"""
        # 词嵌入:(batch_size, seq_len) → (batch_size, seq_len, num_hiddens)
        # 缩放:使词嵌入方差与位置编码匹配;位置编码:注入位置信息
        X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
        # _attention_weights[0]:解码器自注意力权重;[1]:编码器-解码器交叉注意力权重
        self._attention_weights = [[None] * len(self.blks) for _ in range(2)]
        
        for i, blk in enumerate(self.blks):
            X, state = blk(X, state)
            self._attention_weights[0][i] = blk.attention1.attention.attention_weights
            self._attention_weights[1][i] = blk.attention2.attention.attention_weights
            
         return self.dense(X), state

    @property
    def attention_weights(self):
        """属性方法:获取记录的注意力权重(供外部分析/可视化使用)"""
        return self._attention_weights

训练:

num_hiddens, num_layers, dropout, batch_size, num_steps = 32, 2, 0.1, 64, 10
lr, num_epochs, device = 0.005, 200, d2l.try_gpu()
ffn_num_input, ffn_num_hiddens, num_heads = 32, 64, 4
key_size, query_size, value_size = 32, 32, 32
norm_shape = [32]

train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)

encoder = TransformerEncoder(
    len(src_vocab), key_size, query_size, value_size, num_hiddens,
    norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
    num_layers, dropout)
decoder = TransformerDecoder(
    len(tgt_vocab), key_size, query_size, value_size, num_hiddens,
    norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
    num_layers, dropout)
net = d2l.EncoderDecoder(encoder, decoder)
train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)

在这里插入图片描述

预测:

engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
    translation, dec_attention_weight_seq = d2l.predict_seq2seq(
        net, eng, src_vocab, tgt_vocab, num_steps, device, True)
    print(f'{eng} => {translation}, ',
          f'bleu {d2l.bleu(translation, fra, k=2):.3f}')

在这里插入图片描述

自然语言处理

NLP里的迁移学习

  • 使用预训练好的模型来抽取词、句子的特征
    • 例如 word2vec 或语言模型
    • 不更新预训练好的模型
  • 需要构建新的网络来抓取新任务需要的信息
    • Word2vec忽略了时序信息,语言模型只看了一个方向

BERT

BERT的动机

  • 基于微调的NLP模型
  • 预训练的模型抽取了足够多的信息
  • 新的任务只需要增加一个简单的输出层

对输入的修改

  • 每个样本是一个句子对
  • 加入额外的片段嵌入
  • 位置编码可学习

预训练任务1:带掩码的语言模型

  • Transfomer的编码器是双向,标准语言模型要求单向
  • 带掩码的语言模型每次随机(15%概率)将一些词元换成<mask>
  • 因为微调任务中不出现<mask>
    • 80%概率下,将选中的词元变成<mask>
    • 10%概率下换成一个随机词元
    • 10%概率下保持原有的词元

预训练任务2:下一句子预测

  • 预测一个句子对中两个句子是不是相邻
  • 训练样本中:
    • 50%概率选择相邻句子对:<cls> this movie is great <sep> i like it <sep>
    • 50%概率选择随机句子对:<cls> this movie is great <sep> hello world <sep>
  • <cls>对应的输出放到一个全连接层来预测

代码:

import torch
from torch import nn
from d2l import torch as d2l

#@save
def get_tokens_and_segments(tokens_a, tokens_b=None):
    """获取输入序列的词元及其片段索引"""
    tokens = ['<cls>'] + tokens_a + ['<sep>']
    # 0和1分别标记片段A和B
    segments = [0] * (len(tokens_a) + 2)
    if tokens_b is not None:
        tokens += tokens_b + ['<sep>']
        segments += [1] * (len(tokens_b) + 1)
    return tokens, segments
#@save
class BERTEncoder(nn.Module):
    """BERT编码器:将文本词元转换为包含语义、位置和片段信息的上下文向量"""
    def __init__(self, vocab_size, num_hiddens, norm_shape, ffn_num_input,
                 ffn_num_hiddens, num_heads, num_layers, dropout,
                 max_len=1000, key_size=768, query_size=768, value_size=768,
                 **kwargs):
        super(BERTEncoder, self).__init__(** kwargs)
        
        # 1. 词嵌入层:将词元ID映射为向量,捕捉基础语义
        self.token_embedding = nn.Embedding(vocab_size, num_hiddens)
        # 2. 片段嵌入层:区分两个句子(0表示句子A,1表示句子B)
        # 仅需2个嵌入向量(因为最多处理两个句子)
        self.segment_embedding = nn.Embedding(2, num_hiddens)
        # 3. 堆叠多个Transformer编码器块:实现深层上下文编码
        self.blks = nn.Sequential()
        for i in range(num_layers):
            self.blks.add_module(f"{i}", d2l.EncoderBlock(
                key_size, query_size, value_size, num_hiddens, norm_shape,
                ffn_num_input, ffn_num_hiddens, num_heads, dropout, True))
        
        # 4. 可学习的位置嵌入:注入词元在序列中的位置信息
        # 形状:(1, max_len, num_hiddens),max_len为最大序列长度
        # 用nn.Parameter标记为可训练参数,随模型一起更新
        self.pos_embedding = nn.Parameter(torch.randn(1, max_len,
                                                      num_hiddens))

    def forward(self, tokens, segments, valid_lens):
        """
        前向传播:将输入转化为上下文感知的编码向量
        参数:
            tokens:词元ID序列,形状(batch_size, seq_len)
            segments:片段索引(0/1),形状(batch_size, seq_len)
            valid_lens:有效长度,用于屏蔽填充的无效词元,形状(batch_size,)
        返回:
            X:编码后的向量,形状(batch_size, seq_len, num_hiddens)
        """
        # 步骤1:融合词嵌入和片段嵌入
        # token_embedding(tokens):(batch_size, seq_len, num_hiddens)
        # segment_embedding(segments):(batch_size, seq_len, num_hiddens)
        X = self.token_embedding(tokens) + self.segment_embedding(segments)
        # 步骤2:添加位置嵌入(截取与输入序列长度匹配的部分)
        # pos_embedding.data[:, :X.shape[1], :]:取前seq_len个位置的嵌入
        X = X + self.pos_embedding.data[:, :X.shape[1], :]
        for blk in self.blks:
            X = blk(X, valid_lens) 
        return X  
#@save
class MaskLM(nn.Module):
    """BERT的掩蔽语言模型(Masked Language Model, MLM)任务模块
    作用:预测被随机掩盖的词元,使模型学习上下文语义理解能力
    """
    def __init__(self, vocab_size, num_hiddens, num_inputs=768, **kwargs):
        super(MaskLM, self).__init__(**kwargs)
        self.mlp = nn.Sequential(
            nn.Linear(num_inputs, num_hiddens),  
            nn.ReLU(),                           
            nn.LayerNorm(num_hiddens),           # 层归一化,稳定训练过程
            nn.Linear(num_hiddens, vocab_size)   
        )

    def forward(self, X, pred_positions):
        """
        前向传播:从编码器输出中提取需要预测的位置,并用MLP预测原词
        参数:
            X: BERT编码器的输出,形状为 (batch_size, seq_len, num_hiddens)
            pred_positions: 需要预测的位置索引,形状为 (batch_size, num_pred_positions)
        返回:
            mlm_Y_hat: 预测的词元在词汇表上的概率分布,形状为 (batch_size, num_pred_positions, vocab_size)
        """
        # 获取每个样本中需要预测的位置数量
        num_pred_positions = pred_positions.shape[1]
        # 将预测位置的索引展平为一维(方便后续索引操作)
        pred_positions = pred_positions.reshape(-1)
        # 获取批量大小
        batch_size = X.shape[0]
        # 生成批量索引(每个样本要对应num_pred_positions个预测位置)
        batch_idx = torch.arange(0, batch_size)
        # 重复每个批量索引num_pred_positions次(例如batch_size=2,num_pred_positions=3时,变成[0,0,0,1,1,1])
        batch_idx = torch.repeat_interleave(batch_idx, num_pred_positions)
        # 根据批量索引和预测位置,从编码器输出X中提取需要预测的词元向量
        masked_X = X[batch_idx, pred_positions]
        # 调整形状为 (batch_size, num_pred_positions, num_hiddens)
        masked_X = masked_X.reshape((batch_size, num_pred_positions, -1))
        # 通过MLP预测这些位置的原词,得到词汇表上的概率分布
        mlm_Y_hat = self.mlp(masked_X)
        return mlm_Y_hat
#@save
class NextSentencePred(nn.Module):
    """BERT的下一句预测任务"""
    def __init__(self, num_inputs, **kwargs):
        super(NextSentencePred, self).__init__(**kwargs)
        self.output = nn.Linear(num_inputs, 2)

    def forward(self, X):
        # X的形状:(batchsize,num_hiddens)
        return self.output(X)
#@save
class BERTModel(nn.Module):
    """BERT模型"""
    def __init__(self, vocab_size, num_hiddens, norm_shape, ffn_num_input,
                 ffn_num_hiddens, num_heads, num_layers, dropout,
                 max_len=1000, key_size=768, query_size=768, value_size=768,
                 hid_in_features=768, mlm_in_features=768,
                 nsp_in_features=768):
        super(BERTModel, self).__init__()
        self.encoder = BERTEncoder(vocab_size, num_hiddens, norm_shape,
                    ffn_num_input, ffn_num_hiddens, num_heads, num_layers,
                    dropout, max_len=max_len, key_size=key_size,
                    query_size=query_size, value_size=value_size)
        self.hidden = nn.Sequential(nn.Linear(hid_in_features, num_hiddens),
                                    nn.Tanh())
        self.mlm = MaskLM(vocab_size, num_hiddens, mlm_in_features)
        self.nsp = NextSentencePred(nsp_in_features)

    def forward(self, tokens, segments, valid_lens=None,
                pred_positions=None):
        encoded_X = self.encoder(tokens, segments, valid_lens)
        if pred_positions is not None:
            mlm_Y_hat = self.mlm(encoded_X, pred_positions)
        else:
            mlm_Y_hat = None
        # 用于下一句预测的多层感知机分类器的隐藏层,0是“<cls>”标记的索引
        nsp_Y_hat = self.nsp(self.hidden(encoded_X[:, 0, :]))
        return encoded_X, mlm_Y_hat, nsp_Y_hat

用于预训练BERT的数据集:

import os
import random
import torch
from d2l import torch as d2l  # 导入深度学习工具库

# 注册WikiText-2数据集(指定下载链接和校验码)
d2l.DATA_HUB['wikitext-2'] = (
    'https://s3.amazonaws.com/research.metamind.io/wikitext/'
    'wikitext-2-v1.zip', '3c914d17d80b1459be871a5039ac23e752a53cbe')

# 读取并预处理WikiText-2数据集
def _read_wiki(data_dir):
    file_name = os.path.join(data_dir, 'wiki.train.tokens')  # 训练集文件路径
    with open(file_name, 'r') as f:
        lines = f.readlines()  # 逐行读取文本
    # 每行转小写、按句号分割为句子,过滤句子数≥2的行
    paragraphs = [line.strip().lower().split('. ')
                  for line in lines if len(line.split('. ')) >= 2]
    random.shuffle(paragraphs)  # 随机打乱段落顺序
    return paragraphs

生成下一次预测任务的数据:

# 生成“下一句预测”任务的样本(50%相邻,50%不相邻)
def _get_next_sentence(sentence, next_sentence, paragraphs):
    if random.random() < 0.5:  # 50%概率:选相邻句子
        is_next = True
    else:  # 50%概率:从数据中随机选一个句子(不相邻)
        next_sentence = random.choice(random.choice(paragraphs))
        is_next = False
    return sentence, next_sentence, is_next

#@save
def _get_nsp_data_from_paragraph(paragraph, paragraphs, vocab, max_len):
    """从单个段落生成“下一句预测(NSP)”任务的数据"""
    nsp_data_from_paragraph = []
    # 遍历段落中的句子对(取第i句和第i+1句作为相邻候选)
    for i in range(len(paragraph) - 1):
        # 生成句子对及“是否相邻”标签(50%概率相邻,50%不相邻)
        tokens_a, tokens_b, is_next = _get_next_sentence(
            paragraph[i], paragraph[i + 1], paragraphs)
        # 计算加入特殊标记后的总长度(<cls> + tokens_a + <sep> + tokens_b + <sep>)
        if len(tokens_a) + len(tokens_b) + 3 > max_len:
            continue  # 超过最大长度则跳过
        # 生成词元序列和片段索引(片段索引区分句子A和B)
        tokens, segments = d2l.get_tokens_and_segments(tokens_a, tokens_b)
        # 保存(词元、片段、是否相邻)的样本
        nsp_data_from_paragraph.append((tokens, segments, is_next))
    return nsp_data_from_paragraph

生成掩蔽语言模型的数据:

#@save
def _replace_mlm_tokens(tokens, candidate_pred_positions, num_mlm_preds,
                        vocab):
    # 为遮蔽语言模型的输入创建新的词元副本,其中输入可能包含替换的“<mask>”或随机词元
    mlm_input_tokens = [token for token in tokens]
    pred_positions_and_labels = []
    # 打乱后用于在遮蔽语言模型任务中获取15%的随机词元进行预测
    random.shuffle(candidate_pred_positions)
    for mlm_pred_position in candidate_pred_positions:
        if len(pred_positions_and_labels) >= num_mlm_preds:
            break
        masked_token = None
        # 80%的时间:将词替换为“<mask>”词元
        if random.random() < 0.8:
            masked_token = '<mask>'
        else:
            # 10%的时间:保持词不变
            if random.random() < 0.5:
                masked_token = tokens[mlm_pred_position]
            # 10%的时间:用随机词替换该词
            else:
                masked_token = random.choice(vocab.idx_to_token)
        mlm_input_tokens[mlm_pred_position] = masked_token
        pred_positions_and_labels.append(
            (mlm_pred_position, tokens[mlm_pred_position]))
    return mlm_input_tokens, pred_positions_and_labels

#@save
def _get_mlm_data_from_tokens(tokens, vocab):
    candidate_pred_positions = []
    # tokens是一个字符串列表
    for i, token in enumerate(tokens):
        # 在遮蔽语言模型任务中不会预测特殊词元
        if token in ['<cls>', '<sep>']:
            continue
        candidate_pred_positions.append(i)
    # 遮蔽语言模型任务中预测15%的随机词元
    num_mlm_preds = max(1, round(len(tokens) * 0.15))
    mlm_input_tokens, pred_positions_and_labels = _replace_mlm_tokens(
        tokens, candidate_pred_positions, num_mlm_preds, vocab)
    pred_positions_and_labels = sorted(pred_positions_and_labels,
                                       key=lambda x: x[0])
    pred_positions = [v[0] for v in pred_positions_and_labels]
    mlm_pred_labels = [v[1] for v in pred_positions_and_labels]
    return vocab[mlm_input_tokens], pred_positions, vocab[mlm_pred_labels]

将文本转化为预训练数据集:

#@save
def _pad_bert_inputs(examples, max_len, vocab):
    max_num_mlm_preds = round(max_len * 0.15)  # 单样本最多预测的词元数(按序列长度15%计算)
    # 初始化批量数据的存储列表
    all_token_ids, all_segments, valid_lens = [], [], []
    all_pred_positions, all_mlm_weights, all_mlm_labels = [], [], []
    nsp_labels = []
    # 遍历每个样本,进行填充以统一长度
    for (token_ids, pred_positions, mlm_pred_label_ids, segments, is_next) in examples:
        # 填充token_ids到max_len(用<pad>的索引填充,保证批量中长度一致)
        all_token_ids.append(torch.tensor(
            token_ids + [vocab['<pad>']] * (max_len - len(token_ids)), 
            dtype=torch.long))
        # 填充segments到max_len(片段索引,用0填充,0代表句子A的片段)
        all_segments.append(torch.tensor(
            segments + [0] * (max_len - len(segments)), 
            dtype=torch.long))
        # 记录有效长度(不含<pad>的部分,用于后续过滤填充的影响)
        valid_lens.append(torch.tensor(len(token_ids), dtype=torch.float32))
        # 填充MLM预测位置到max_num_mlm_preds(用0填充,后续通过权重过滤无效位置)
        all_pred_positions.append(torch.tensor(
            pred_positions + [0] * (max_num_mlm_preds - len(pred_positions)), 
            dtype=torch.long))
        # 生成MLM权重:有效预测位置权重为1.0,填充位置为0.0(损失计算时忽略填充部分)
        all_mlm_weights.append(torch.tensor(
            [1.0] * len(mlm_pred_label_ids) + [0.0] * (max_num_mlm_preds - len(pred_positions)), 
            dtype=torch.float32))
        # 填充MLM真实标签:有效标签保留,填充位置用0(权重为0时,填充的标签不影响损失)
        all_mlm_labels.append(torch.tensor(
            mlm_pred_label_ids + [0] * (max_num_mlm_preds - len(mlm_pred_label_ids)), 
            dtype=torch.long))
        # 记录NSP任务的标签(是否为相邻句子对,True/False转成张量)
        nsp_labels.append(torch.tensor(is_next, dtype=torch.long))
    # 返回批量数据:token索引、片段、有效长度、MLM预测位置、MLM权重、MLM标签、NSP标签
    return (all_token_ids, all_segments, valid_lens, all_pred_positions,
            all_mlm_weights, all_mlm_labels, nsp_labels)
#@save
class _WikiTextDataset(torch.utils.data.Dataset):
    """BERT预训练数据集类,继承自PyTorch的Dataset,处理WikiText-2数据"""
    def __init__(self, paragraphs, max_len):
        # 1. 分词:将段落的句子字符串转换为词元列表
        paragraphs = [d2l.tokenize(paragraph, token='word') for paragraph in paragraphs]
        # 2. 收集所有句子(用于构建词汇表,覆盖所有可能的词元)
        sentences = [sentence for paragraph in paragraphs for sentence in paragraph]
        # 3. 构建词汇表:过滤频率<5的低频词,保留<pad>(填充)、<mask>(掩蔽)、<cls>(分类汇总)、<sep>(句子分隔)等特殊标记
        self.vocab = d2l.Vocab(sentences, min_freq=5, reserved_tokens=['<pad>', '<mask>', '<cls>', '<sep>'])
        
        # 4. 生成“下一句预测(NSP)”任务的样本:句子对 + 是否相邻的标签
        examples = []
        for paragraph in paragraphs:
            nsp_samples = _get_nsp_data_from_paragraph(paragraph, paragraphs, self.vocab, max_len)
            examples.extend(nsp_samples)  # 将每个段落的NSP样本加入总列表
        
        # 5. 对每个NSP样本,生成“掩蔽语言模型(MLM)”任务的处理:掩蔽部分词元 + 记录真实标签
        examples = [
            (_get_mlm_data_from_tokens(tokens, self.vocab) + (segments, is_next)) 
            for tokens, segments, is_next in examples
        ]
        
        # 6. 填充所有样本到统一长度max_len,并转换为PyTorch张量(适配批量训练)
        (self.all_token_ids, self.all_segments, self.valid_lens,
         self.all_pred_positions, self.all_mlm_weights,
         self.all_mlm_labels, self.nsp_labels) = _pad_bert_inputs(examples, max_len, self.vocab)

    def __getitem__(self, idx):
        """按索引idx获取单个样本,返回BERT预训练所需的所有数据(同时支持MLM和NSP任务)"""
        return (
            self.all_token_ids[idx],      # 词元的词汇表索引
            self.all_segments[idx],       # 片段索引(区分句子A/B)
            self.valid_lens[idx],         # 有效长度(过滤填充的<pad>)
            self.all_pred_positions[idx], # MLM任务中需要预测的词元位置
            self.all_mlm_weights[idx],    # MLM损失的权重(过滤填充位置)
            self.all_mlm_labels[idx],     # MLM任务的真实标签(被掩蔽词元的索引)
            self.nsp_labels[idx]          # NSP任务的标签(是否为相邻句子对)
        )

    def __len__(self):
        """返回数据集的样本总数(即填充后总样本量)"""
        return len(self.all_token_ids)
# 加载WikiText-2数据集,生成BERT预训练的批量数据和词汇表
def load_data_wiki(batch_size, max_len):
    num_workers = d2l.get_dataloader_workers()  # 获取数据加载进程数(多进程加速)
    data_dir = d2l.download_extract('wikitext-2', 'wikitext-2')  # 下载并解压数据集
    paragraphs = _read_wiki(data_dir)  # 读取文本并预处理为段落列表
    train_set = _WikiTextDataset(paragraphs, max_len)  # 构建BERT预训练数据集
    # 生成批量数据加载器(支持批量、打乱、多进程)
    train_iter = torch.utils.data.DataLoader(
        train_set, batch_size, shuffle=True, num_workers=num_workers)
    return train_iter, train_set.vocab  # 返回数据迭代器和词汇表

# 超参数:批量大小512,序列最大长度64
batch_size, max_len = 512, 64
train_iter, vocab = load_data_wiki(batch_size, max_len)  # 加载数据
for (tokens_X, segments_X, valid_lens_x, pred_positions_X, mlm_weights_X,
     mlm_Y, nsp_y) in train_iter:
    print(tokens_X.shape, segments_X.shape, valid_lens_x.shape,
          pred_positions_X.shape, mlm_weights_X.shape, mlm_Y.shape,
          nsp_y.shape)
    break

啊啊啊,用课程给的不知道怎么下载不出来,换了别的路径下载:

d2l.DATA_HUB['wikitext-2'] = (
    # 新的国内镜像链接(需确保能下载到完整的 wikitext-2-v1.zip)
    'https://ascend-repo-modelzoo.obs.cn-east-2.myhuaweicloud.com/MindFormers/dataset/wikitext-2/wikitext-2-v1.zip',
    '3c914d17d80b1459be871a5039ac23e752a53cbe'  
)

BERT预训练:

import torch
from torch import nn
from d2l import torch as d2l

batch_size, max_len = 512, 64
train_iter, vocab = d2l.load_data_wiki(batch_size, max_len)

# 初始化BERT模型(配置模型结构参数)
net = d2l.BERTModel(len(vocab), num_hiddens=128, norm_shape=[128],
                    ffn_num_input=128, ffn_num_hiddens=256, num_heads=2,
                    num_layers=2, dropout=0.2, key_size=128, query_size=128,
                    value_size=128, hid_in_features=128, mlm_in_features=128,
                    nsp_in_features=128)

devices = d2l.try_all_gpus()
loss = nn.CrossEntropyLoss()
#@save
def _get_batch_loss_bert(net, loss, vocab_size, tokens_X,
                         segments_X, valid_lens_X,
                         pred_positions_X, mlm_weights_X,
                         mlm_Y, nsp_y):
    """计算BERT在一个批量数据上的损失(同时包含MLM和NSP任务)"""
    # 前向传播:获取模型输出(_ 为编码器输出,mlm_Y_hat为MLM预测,nsp_Y_hat为NSP预测)
    _, mlm_Y_hat, nsp_Y_hat = net(tokens_X, segments_X,
                                  valid_lens_X.reshape(-1),  # 展平有效长度为1维
                                  pred_positions_X)          # 传入MLM预测位置

    # 计算掩蔽语言模型(MLM)损失
    # 调整MLM预测和标签形状以匹配交叉熵输入,并乘以损失权重(过滤填充位置)
    mlm_l = loss(mlm_Y_hat.reshape(-1, vocab_size), mlm_Y.reshape(-1)) * \
            mlm_weights_X.reshape(-1, 1)
    # 对有效损失求和,再除以有效权重总和(+1e-8防止除零)
    mlm_l = mlm_l.sum() / (mlm_weights_X.sum() + 1e-8)
    nsp_l = loss(nsp_Y_hat, nsp_y)
    l = mlm_l + nsp_l
    return mlm_l, nsp_l, l
def train_bert(net, train_iter, vocab_size, lr, num_steps, devices):
    """BERT模型预训练主函数"""
    # 多GPU并行训练(将模型复制到多个GPU,自动分配数据和计算)
    net = nn.DataParallel(net, device_ids=devices).to(devices[0])
    trainer = torch.optim.Adam(net.parameters(), lr=lr)
    loss = nn.CrossEntropyLoss()

    timer = d2l.Timer()  
    animator = d2l.Animator(xlabel='step', ylabel='loss', 
                           xlim=[1, num_steps], legend=['mlm', 'nsp'])
    # 累加器(统计4个指标:MLM总损失、NSP总损失、总样本数、总步数)
    metric = d2l.Accumulator(4)
    num_steps_reached = False  

    while not num_steps_reached:
        for batch in train_iter:
            tokens_X, segments_X, valid_lens_X, pred_positions_X, mlm_weights_X, mlm_Y, nsp_y = [
                x.to(devices[0]) for x in batch
            ]
            trainer.zero_grad()
            timer.start()
            
            mlm_l, nsp_l, l = _get_batch_loss_bert(
                net, loss, vocab_size, tokens_X, segments_X, valid_lens_X,
                pred_positions_X, mlm_weights_X, mlm_Y, nsp_y
            )
            
            l.backward()
            trainer.step()
            
            metric.add(mlm_l, nsp_l, tokens_X.shape[0], 1)
            timer.stop()
            
            animator.add(metric[3], (metric[0]/metric[3], metric[1]/metric[3]))
            
            # 检查是否达到总训练步数
            if metric[3] == num_steps:
                num_steps_reached = True
                break

    # 4. 输出训练结果
    print(f'最终 MLM 损失: {metric[0]/metric[3]:.3f}')
    print(f'最终 NSP 损失: {metric[1]/metric[3]:.3f}')
    print(f'训练速度: {metric[2]/timer.sum():.1f} 句子对/秒(使用设备: {devices})')

用BERT表示文本:

def get_bert_encoding(net, tokens_a, tokens_b=None):
    tokens, segments = d2l.get_tokens_and_segments(tokens_a, tokens_b)
    token_ids = torch.tensor(vocab[tokens], device=devices[0]).unsqueeze(0)
    segments = torch.tensor(segments, device=devices[0]).unsqueeze(0)
    valid_len = torch.tensor(len(tokens), device=devices[0]).unsqueeze(0)
    encoded_X, _, _ = net(token_ids, segments, valid_len)
    return encoded_X

微调BERT

  • BERT 对每一个词元返回抽取了上下文信息的特征向量
  • 不同的任务使用不同的特性

句子分类:将对应的向量输入到全连接层分类
命名实体识别:识别一个词元是不是命名实体,例如人名、机构、位置,将非特殊词元放进全连接层分类
问题回答:给定一个问题,和描述文字,找出一个片段作为回答,对片段中的每个词元预测它是不是回答的开头或结束。

自然语言推断与数据集:

import os
import re
import torch
from torch import nn
from d2l import torch as d2l

d2l.DATA_HUB['SNLI'] = (
    'https://nlp.stanford.edu/projects/snli/snli_1.0.zip',
    '9fcde07509c7e87ec61c64c1b2753d9041758e4')

data_dir = d2l.download_extract('SNLI')

def read_snli(data_dir, is_train):
    """将SNLI数据集解析为“前提列表、假设列表、标签列表”"""
    def extract_text(s):
        s = re.sub('\(', '', s)  # 移除左括号
        s = re.sub('\)', '', s)  # 移除右括号
        s = re.sub('\s{2,}', ' ', s)  # 把2个及以上的空格替换为1个
        return s.strip()  # 移除首尾空格
    
    # 标签映射:文本标签 → 数字标签
    label_set = {'entailment': 0, 'contradiction': 1, 'neutral': 2}
    file_name = os.path.join(data_dir, 'snli_1.0_train.txt'
                             if is_train else 'snli_1.0_test.txt')
    
    with open(file_name, 'r') as f:
        # 按行读取文件,每行用\t分割(SNLI数据集每行是制表符分隔的列)
        # f.readlines()[1:] 跳过第一行(表头)
        rows = [row.split('\t') for row in f.readlines()[1:]]
    
    # 筛选出“标签有效”的行,并分别提取前提、假设、标签
    # 只保留标签在label_set中的行(过滤无效标签)
    premises = [extract_text(row[1]) for row in rows if row[0] in label_set]
    hypotheses = [extract_text(row[2]) for row in rows if row[0] in label_set]
    labels = [label_set[row[0]] for row in rows if row[0] in label_set]
    return premises, hypotheses, labels
class SNLIDataset(torch.utils.data.Dataset):
    """用于加载SNLI数据集的自定义数据集类,继承自PyTorch的Dataset"""
    def __init__(self, dataset, num_steps, vocab=None):
        """初始化数据集"""
        self.num_steps = num_steps
        # 对前提(premise)和假设(hypothesis)的文本进行分词
        all_premise_tokens = d2l.tokenize(dataset[0])
        all_hypothesis_tokens = d2l.tokenize(dataset[1])
        
        # 构建或使用已有词汇表
        if vocab is None:
            # 合并前提和假设的分词结果,构建词汇表(过滤低频词,保留<pad>)
            self.vocab = d2l.Vocab(all_premise_tokens + all_hypothesis_tokens, 
                                   min_freq=5, reserved_tokens=['<pad>'])
        else:
            self.vocab = vocab
        # 对前提和假设的分词结果进行填充/截断,使其长度统一为num_steps
        self.premises = self._pad(all_premise_tokens)
        self.hypotheses = self._pad(all_hypothesis_tokens)
        self.labels = torch.tensor(dataset[2])
        print('read ' + str(len(self.premises)) + ' examples')  # 打印数据样本数

    def _pad(self, lines):
        """对输入的分词序列进行填充或截断,使其长度为num_steps"""
        return torch.tensor([d2l.truncate_pad(
            self.vocab[line], self.num_steps, self.vocab['<pad>'])
            for line in lines])
    def __getitem__(self, idx):
        """支持通过索引获取样本:返回(前提张量, 假设张量), 标签"""
        return (self.premises[idx], self.hypotheses[idx]), self.labels[idx]
    def __len__(self):
        """返回数据集总样本数"""
        return len(self.premises)
def load_data_snli(batch_size, num_steps=50):
    """下载并加载SNLI数据集,返回训练/测试数据迭代器和词汇表"""
    num_workers = d2l.get_dataloader_workers()  # 获取数据加载的进程数(多进程加速)
    data_dir = d2l.download_extract('SNLI')  # 下载并解压SNLI数据集到本地
    train_data = read_snli(data_dir, True)   # 读取训练数据
    test_data = read_snli(data_dir, False)   # 读取测试数据
    
    train_set = SNLIDataset(train_data, num_steps)
    test_set = SNLIDataset(test_data, num_steps, train_set.vocab)
    
    # 生成训练数据加载器(打乱数据)
    train_iter = torch.utils.data.DataLoader(
        train_set, batch_size, shuffle=True, num_workers=num_workers)
    # 生成测试数据加载器(不打乱数据)
    test_iter = torch.utils.data.DataLoader(
        test_set, batch_size, shuffle=False, num_workers=num_workers)
    
    return train_iter, test_iter, train_set.vocab

自然语言推断:微调BERT:

import json
import multiprocessing
import os
import torch
from torch import nn
from d2l import torch as d2l
d2l.DATA_HUB['bert.base'] = (d2l.DATA_URL + 'bert.base.torch.zip',
                             '225d66f04cae318b841a13d32af3acc165f253ac')
d2l.DATA_HUB['bert.small'] = (d2l.DATA_URL + 'bert.small.torch.zip',
                              'c72329e68a732bef0452e4b96a1c341c8910f81f')
def load_pretrained_model(pretrained_model, num_hiddens, ffn_num_hiddens,
                          num_heads, num_layers, dropout, max_len, devices):
    data_dir = d2l.download_extract(pretrained_model)
    vocab = d2l.Vocab()  # 初始化d2l的Vocab类(封装词表操作)
    # 从data_dir下的'vocab.json'读取“索引→词元”的列表
    vocab.idx_to_token = json.load(open(os.path.join(data_dir, 'vocab.json')))
    # 反向生成“词元→索引”的字典(方便通过词元查索引)
    vocab.token_to_idx = {token: idx for idx, token in enumerate(vocab.idx_to_token)}
    bert = d2l.BERTModel(len(vocab), num_hiddens, norm_shape=[256],
                         ffn_num_input=256, ffn_num_hiddens=ffn_num_hiddens,
                         num_heads=4, num_layers=2, dropout=0.2,
                         max_len=max_len, key_size=256, query_size=256,
                         value_size=256, hid_in_features=256,
                         mlm_in_features=256, nsp_in_features=256)
    # torch.load 读取本地的预训练参数文件,load_state_dict将参数加载到模型
    bert.load_state_dict(torch.load(os.path.join(data_dir, 'pretrained.params'),
        weights_only=True))
    return bert, vocab

devices = d2l.try_all_gpus()
bert, vocab = load_pretrained_model(
    'bert.small', num_hiddens=256, ffn_num_hiddens=512, num_heads=4,
    num_layers=2, dropout=0.1, max_len=512, devices=devices)
class SNLIBERTDataset(Dataset):
    """适配 BERT 模型的 SNLI 数据集自定义类,处理“前提-假设”文本对为模型输入格式"""

    def __init__(self, dataset, max_len, vocab=None):
        """初始化数据集,完成文本分词与预处理
        Args:
            dataset: 原始数据集,格式为 (前提句子列表, 假设句子列表, 标签列表)
            max_len: 序列最大长度(需预留 [CLS]、[SEP] 等特殊标记的位置)
            vocab: 词汇表(若为 None,需外部提前构建或在其他逻辑中处理)
        """
        # 对“前提句子”和“假设句子”分别分词(转小写后分词)
        all_premise_hypothesis_tokens = [
            [p_tokens, h_tokens] 
            for p_tokens, h_tokens in zip(
                *[d2l.tokenize([s.lower() for s in sentences]) 
                  for sentences in dataset[:2]]
            )
        ]

        self.labels = torch.tensor(dataset[2])  # 标签转为 PyTorch 张量
        self.vocab = vocab
        self.max_len = max_len
        # 预处理生成 BERT 所需的输入:token_ids(词元索引)、segments(片段标识)、valid_lens(有效长度)
        (self.all_token_ids, self.all_segments, self.valid_lens) = self._preprocess(all_premise_hypothesis_tokens)
        print(f"读取 {len(self.all_token_ids)} 个样本")

    def _preprocess(self, all_premise_hypothesis_tokens):
        """多进程预处理所有“前提-假设”对,加速数据处理
        Args:
            all_premise_hypothesis_tokens: 所有“前提分词列表-假设分词列表”的对
        Returns:
            tuple: 处理后的 token_ids 张量、segments 张量、valid_lens 张量
        """
        pool = multiprocessing.Pool(4)  # 启动 4 个进程并行处理
        out = pool.map(self._mp_worker, all_premise_hypothesis_tokens)  # 多进程映射处理每个样本
        # 从多进程结果中拆分出各部分数据
        all_token_ids = [token_ids for token_ids, _, _ in out]
        all_segments = [segments for _, segments, _ in out]
        valid_lens = [valid_len for _, _, valid_len in out]
        # 转换为 PyTorch 张量并返回
        return (
            torch.tensor(all_token_ids, dtype=torch.long),
            torch.tensor(all_segments, dtype=torch.long),
            torch.tensor(valid_lens)
        )

    def _mp_worker(self, premise_hypothesis_tokens):
        """单进程工作函数:处理单个“前提-假设”对,生成模型输入
        Args:
            premise_hypothesis_tokens: 单个样本的 (前提分词列表, 假设分词列表)
        Returns:
            tuple: token_ids(填充后)、segments(填充后)、valid_len(有效长度)
        """
        p_tokens, h_tokens = premise_hypothesis_tokens
        # 截断前提/假设,使总长度不超过 max_len - 3(预留 [CLS] 和两个 [SEP])
        self._truncate_pair_of_tokens(p_tokens, h_tokens)
        # 生成带特殊标记的词元列表 + 片段索引(0=前提,1=假设)
        tokens, segments = d2l.get_tokens_and_segments(p_tokens, h_tokens)
        # 词元转词汇表索引 + 填充到 max_len(不足用 <pad> 的索引)
        token_ids = self.vocab[tokens] + [self.vocab['<pad>']] * (self.max_len - len(tokens))
        # 片段索引填充(不足用 0,因为填充部分属于“前提侧”的填充)
        segments = segments + [0] * (self.max_len - len(segments))
        valid_len = len(tokens)  # 有效长度为“未填充的词元数”
        return token_ids, segments, valid_len

    def _truncate_pair_of_tokens(self, p_tokens, h_tokens):
        """截断前提/假设的分词,确保总长度不超过 max_len - 3
        Args:
            p_tokens: 前提的分词列表
            h_tokens: 假设的分词列表
        """
        while len(p_tokens) + len(h_tokens) > self.max_len - 3:
            # 策略:谁更长就截断谁的最后一个词
            if len(p_tokens) > len(h_tokens):
                p_tokens.pop()
            else:
                h_tokens.pop()

    def __getitem__(self, idx):
        """支持通过索引获取样本,返回格式:(token_ids, segments, valid_lens), label
        适配 PyTorch DataLoader 的批量读取逻辑
        """
        return (self.all_token_ids[idx], self.all_segments[idx], self.valid_lens[idx]), self.labels[idx]

    def __len__(self):
        """返回数据集总样本数"""
        return len(self.all_token_ids)
batch_size, max_len, num_workers = 512, 128, d2l.get_dataloader_workers()
data_dir = d2l.download_extract('SNLI')
train_set = SNLIBERTDataset(d2l.read_snli(data_dir, True), max_len, vocab)
test_set = SNLIBERTDataset(d2l.read_snli(data_dir, False), max_len, vocab)
train_iter = torch.utils.data.DataLoader(
    train_set, batch_size, shuffle=True, num_workers=num_workers)
test_iter = torch.utils.data.DataLoader(
    test_set, batch_size, num_workers=num_workers)


class BERTClassifier(nn.Module):
    def __init__(self, bert):
        """初始化BERT分类器,基于预训练BERT做“自然语言推理”下游任务
        Args:
            bert: 预训练好的BERT模型(包含encoder、hidden等组件)
        """
        super(BERTClassifier, self).__init__()
        self.encoder = bert.encoder 
        self.hidden = bert.hidden    
        # 输出层:将BERT的特征(256维)映射到3个类别(SNLI的“蕴含/矛盾/中性”)
        self.output = nn.Linear(256, 3)

    def forward(self, inputs):
        """前向传播:输入文本对,输出分类预测
        Args:
            inputs: 元组(tokens_X, segments_X, valid_lens_X),分别对应:
                - tokens_X:词元的“词汇表索引”序列(模型输入的基本单位)
                - segments_X:“片段标识”(区分“前提句”和“假设句”的词元)
                - valid_lens_X:“有效长度”(忽略填充的<pad>部分)
        Returns:
            模型对每个样本的3分类预测(蕴含/矛盾/中性)
        """
        tokens_X, segments_X, valid_lens_X = inputs
        encoded_X = self.encoder(tokens_X, segments_X, valid_lens_X)
        # 第 1 个token是cls的特征表述,用来放进MLP做预测
        # encoded_X[:, 0, :] → 取“批量中每个样本的第0个词元的所有特征”
        return self.output(self.hidden(encoded_X[:, 0, :]))

net = BERTClassifier(bert)

lr, num_epochs = 1e-4, 5  # 学习率设为1e-4(微调预训练模型用小学习率),训练5个epoch
trainer = torch.optim.Adam(net.parameters(), lr=lr)
loss = nn.CrossEntropyLoss(reduction='none')
d2l.train_ch13(net, train_iter, test_iter, loss, trainer, num_epochs, devices)

这是关闭多线程还有batch_size改为32的运行结果:
在这里插入图片描述
(跑的好慢)

到这里就完结撒花了哈哈哈!!!

Logo

更多推荐