动手学深度学习 PyTorch版 学习笔记 (三)
**学习深度学习关键是动手**- 深度学习是人工智能最热的领域- 核心是神经网络- 神经网络是一门语言- 应该像学习Python/C++一样学习深度学习
学习笔记
学习深度学习关键是动手
- 深度学习是人工智能最热的领域
- 核心是神经网络
- 神经网络是一门语言
- 应该像学习Python/C++一样学习深度学习
参考视频:跟李沐学AI
参考书目:动手学深度学习 PyTorch版
循环神经网络
序列模型
- 序列数据
- 实际中很多数据是有时序结构的
- 电影的评价随时间变化而变化
- 拿奖后评分上升,直到奖项被忘记
- 看了很多好电影后,人们的期望变高
- 季节性:贺岁片、暑期档
- 导演、演员的负面报道导致评分变低
统计工具
- 在时间 t t t 观察到 x t x_t xt,那么得到 T T T 个不独立的随机变量 ( x 1 , … , x T ) ∼ p ( x ) (x_1, \dots, x_T) \sim p(\mathbf{x}) (x1,…,xT)∼p(x)
- 使用条件概率展开 p ( a , b ) = p ( a ) p ( b ∣ a ) = p ( b ) p ( a ∣ b ) p(a, b) = p(a)p(b \mid a) = p(b)p(a \mid b) p(a,b)=p(a)p(b∣a)=p(b)p(a∣b)
序列模型
- 联合概率的链式法则:
p ( x ) = p ( x 1 ) ⋅ p ( x 2 ∣ x 1 ) ⋅ p ( x 3 ∣ x 1 , x 2 ) ⋅ … p ( x T ∣ x 1 , … x T − 1 ) p(\mathbf{x}) = p(x_1) \cdot p(x_2 \mid x_1) \cdot p(x_3 \mid x_1, x_2) \cdot \dots p(x_T \mid x_1, \dots x_{T-1}) p(x)=p(x1)⋅p(x2∣x1)⋅p(x3∣x1,x2)⋅…p(xT∣x1,…xT−1) - 对条件概率建模:
p ( x t ∣ x 1 , … x t − 1 ) = p ( x t ∣ f ( x 1 , … x t − 1 ) ) p(x_t \mid x_1, \dots x_{t-1}) = p(x_t \mid f(x_1, \dots x_{t-1})) p(xt∣x1,…xt−1)=p(xt∣f(x1,…xt−1))
(对见过的数据建模,自回归模型)
方案A,马尔科夫假设: 假设当前数据只跟 τ \tau τ个过去数据点相关
方案B,潜变量模型:
引入潜变量 h t h_t ht来表示过去信息 h t = f ( x 1 , … , x t − 1 ) h_t = f(x_1, \dots, x_{t-1}) ht=f(x1,…,xt−1)
这样 x t = p ( x t ∣ h t ) x_t = p(x_t \mid h_t) xt=p(xt∣ht)
代码实现(尝试使用马尔科夫假设):
%matplotlib inline
import torch
from torch import nn
from d2l import torch as d2l
T = 1000
time = torch.arange(1, T + 1, dtype=torch.float32)
# 生成带噪声的正弦时间序列:
x = torch.sin(0.01 * time) + torch.normal(0, 0.2, (T,))
d2l.plot(time, [x], 'time', 'x', xlim=[1, 1000], figsize=(6, 3))
将数据映射为数据对 y t = x t y_t = x_t yt=xt和 x t = [ x t − τ , … , x t − 1 ] \mathbf{x}_t = [x_{t-\tau}, \ldots, x_{t-1}] xt=[xt−τ,…,xt−1]
tau = 4
features = torch.zeros((T - tau, tau))
# 填充特征矩阵:每一列对应不同延迟的时间序列数据
for i in range(tau): # i从0到3(共4次循环)
# 第i列填充x中从索引i开始、长度为(T-tau)的数据
# 例如:
# i=0时,取x[0:996] → 对应"当前时间点前4步"的数据
# i=1时,取x[1:997] → 对应"当前时间点前3步"的数据
# ...以此类推,最终每行features[i] = [x[i], x[i+1], x[i+2], x[i+3]]
features[:, i] = x[i: T - tau + i]
# 定义标签:取原始序列中从第tau个时间点(索引4)开始到末尾的数据
labels = x[tau:].reshape((-1, 1))
# 从features和labels中取前600个样本作为训练数据
batch_size, n_train = 16, 600
train_iter = d2l.load_array((features[:n_train], labels[:n_train]),
batch_size, is_train=True)
使用一个简单的多层感知机:
def init_weights(m):
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight)
def get_net():
net = nn.Sequential(nn.Linear(4, 10),
nn.ReLU(),
nn.Linear(10, 1))
net.apply(init_weights)
return net
loss = nn.MSELoss(reduction='none')
def train(net, train_iter, loss, epochs, lr):
trainer = torch.optim.Adam(net.parameters(), lr)
for epoch in range(epochs):
for X, y in train_iter:
trainer.zero_grad()
l = loss(net(X), y)
l.sum().backward()
trainer.step()
print(f'epoch {epoch + 1}, '
f'loss: {d2l.evaluate_loss(net, train_iter, loss):f}')
net = get_net()
train(net, train_iter, loss, 5, 0.01)
onestep_preds = net(features)
d2l.plot([time, time[tau:]],
[x.detach().numpy(), onestep_preds.detach().numpy()], 'time',
'x', legend=['data', '1-step preds'], xlim=[1, 1000],
figsize=(6, 3))
进行多步预测(也就是使用自己的数据而不是原始数据来进行多部预测):
multistep_preds = torch.zeros(T)
multistep_preds[: n_train + tau] = x[: n_train + tau]
for i in range(n_train + tau, T): # i从604循环到999(共396个时间点)
# 用“过去tau=4个时间点的结果”作为输入,预测第i个时间点的值
multistep_preds[i] = net(
multistep_preds[i - tau:i].reshape((1, -1)) # 取[i-4, i-3, i-2, i-1]这4个点,转成模型需要的形状(1,4)
)
d2l.plot([time, time[tau:], time[n_train + tau:]],
[x.detach().numpy(), onestep_preds.detach().numpy(),
multistep_preds[n_train + tau:].detach().numpy()], 'time',
'x', legend=['data', '1-step preds', 'multistep preds'],
xlim=[1, 1000], figsize=(6, 3))

(由于误差累积,多部预测的效果不是很好)
更仔细地看一下 k k k步预测
max_steps = 64
# 创建特征矩阵features,用于存储"历史数据+未来多步预测结果"
# 形状为 (N, tau + max_steps),其中:
# - N = T - tau - max_steps + 1:有效样本数(能完整容纳"tau个历史+max_steps个预测"的窗口数量)
# - tau + max_steps:列数=历史窗口长度+预测步数(前tau列存历史,后max_steps列存预测)
features = torch.zeros((T - tau - max_steps + 1, tau + max_steps))
# 填充前tau列:存入真实的历史观测数据(滑动窗口方式)
# 目的:为每个样本准备"过去tau个时间点的真实数据",作为预测的初始输入
for i in range(tau):
# 对第i列,从原始序列x中截取长度为N的数据填充
# 例如i=0时取x[0:N],i=1时取x[1:N+1]...形成连续的tau个历史数据窗口
features[:, i] = x[i: i + T - tau - max_steps + 1]
# 填充后max_steps列:用模型net递归预测未来多步,并存储预测结果
# 核心逻辑:每一步预测都依赖"最近的tau个数据"(可能是历史真实值或之前的预测结果)
for i in range(tau, tau + max_steps):
# 输入:取当前列前tau个数据(features[:, i-tau:i])
# 例如i=tau时,输入是前tau列的历史数据;i=tau+1时,输入是第1列到第tau列+第tau列的预测结果
# 输出:模型预测的第i-tau步结果,填入第i列
features[:, i] = net(features[:, i - tau:i]).reshape(-1)
steps = (1, 4, 16, 64)
d2l.plot([time[tau + i - 1: T - max_steps + i] for i in steps],
[features[:, (tau + i - 1)].detach().numpy() for i in steps], 'time', 'x',
legend=[f'{i}-step preds' for i in steps], xlim=[5, 1000],
figsize=(6, 3))

文本预处理
文本预处理的核心思想是如何把词变成我们要训练的东西
代码实现:
import collections
import re
from d2l import torch as d2l
# 简单且暴力的预处理hh
def read_time_machine():
"""将时间机器数据集加载到文本行的列表中"""
with open(d2l.download('time_machine'), 'r') as f:
lines = f.readlines()
# 文本清洗与标准化:
# re.sub('[^A-Za-z]+', ' ', line):将非字母的字符(如数字、标点)替换为空格;
# [^A-Za-z]+ 的含义是:匹配 “一个或多个连续的非字母字符”
# strip():去除每行首尾的空白;lower():将所有字母转为小写;
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
def tokenize(lines, token='word'):
"""将文本行拆分为单词或字符词元"""
if token == 'word':
# 按空格分割每行,得到“单词列表的列表”(每行对应一个单词列表)
return [line.split() for line in lines]
elif token == 'char':
# 将每行的每个字符拆分为独立元素,得到“字符列表的列表”
return [list(line) for line in lines]
else:
print('错误:未知词元类型:' + token)
接下来另一个重要的概念:
构建一个字典,通常也叫做 词表(vocabulary),
用来将字符串类型的词元映射到从 0 0 0开始的数字索引中
class Vocab:
"""文本词表:建立词元与索引的双向映射"""
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# 统计词频并按频率从高到低排序
counter = count_corpus(tokens)
self._token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True)
# 初始化词表(先加入未知词<unk>和预留词元)
self.idx_to_token = ['<unk>'] + reserved_tokens # 索引0固定为未知词
self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)}
# 加入高频词元(过滤低频词)
for token, freq in self._token_freqs:
if freq < min_freq:
break # 频率低于阈值,后续词元频率更低,直接停止
if token not in self.token_to_idx: # 避免重复添加
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1 # 记录当前索引
def __len__(self):
"""返回词表大小"""
return len(self.idx_to_token)
def __getitem__(self, tokens):
"""将词元(单个或列表)转换为索引"""
if not isinstance(tokens, (list, tuple)):
# 单个词元:若不在词表中,返回<unk>的索引(0)
return self.token_to_idx.get(tokens, self.unk)
# 多个词元:递归转换每个词元
return [self.__getitem__(token) for token in tokens]
def to_tokens(self, indices):
"""将索引(单个或列表)转换为词元"""
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
# 多个索引:转换每个索引
return [self.idx_to_token[index] for index in indices]
@property
def unk(self):
"""返回未知词<unk>的索引(固定为0)"""
return 0
@property
def token_freqs(self):
"""返回词元频率列表(词元, 频率)"""
return self._token_freqs
def count_corpus(tokens):
"""统计词元的频率"""
# 若输入是嵌套列表(如[[词元1, 词元2], [词元3, ...]]),先展平为一维列表
if len(tokens) == 0 or isinstance(tokens[0], list):
tokens = [token for line in tokens for token in line]
# 用Counter统计每个词元的出现次数,返回{词元: 频率}字典
return collections.Counter(tokens)
将所有功能打包到load_corpus_time_machine函数中:
def load_corpus_time_machine(max_tokens=-1):
"""返回时光机器数据集的词元索引列表和词表"""
lines = read_time_machine()
tokens = tokenize(lines, 'char')
vocab = Vocab(tokens)
corpus = [vocab[token] for line in tokens for token in line]
if max_tokens > 0:
corpus = corpus[:max_tokens]
return corpus, vocab
corpus, vocab = load_corpus_time_machine()
len(corpus), len(vocab)
语言模型
- 给定文本序列 x 1 , … , x T x_1, \dots, x_T x1,…,xT,语言模型的目标是估计联合概率 p ( x 1 , … , x T ) p(x_1, \dots, x_T) p(x1,…,xT)。
- 它的应用包括:
- 作为预训练模型(例如 BERT、GPT-3 等,通过语言模型任务学习文本的通用表示);
- 生成本文:给定前面若干词,不断利用 x t ∼ p ( x t ∣ x 1 , … , x t − 1 ) x_t \sim p(x_t \mid x_1, \dots, x_{t-1}) xt∼p(xt∣x1,…,xt−1)(“下一个词基于前面所有词的条件概率”)来生成后续文本;
- 判断多个序列中哪个更常见(例如对比 “to recognize” vs “to wreck a nice beach”,前者是自然常见的表达,后者是罕见/错误序列,语言模型可通过概率区分)。
使用计数来建模
-
假设序列长度为2,我们预测
p ( x , x ′ ) = p ( x ) p ( x ′ ∣ x ) = n ( x ) n ⋅ n ( x , x ′ ) n ( x ) p(x, x') = p(x)p(x' \mid x) = \frac{n(x)}{n} \cdot \frac{n(x, x')}{n(x)} p(x,x′)=p(x)p(x′∣x)=nn(x)⋅n(x)n(x,x′)- 这里 n 是总词数, n(x), n(x, x’) 是单个单词和连续单词对的出现次数
-
很容易拓展到长为3的情况
p ( x , x ′ , x ′ ′ ) = p ( x ) p ( x ′ ∣ x ) p ( x ′ ′ ∣ x , x ′ ) = n ( x ) n ⋅ n ( x , x ′ ) n ( x ) ⋅ n ( x , x ′ , x ′ ′ ) n ( x , x ′ ) p(x, x', x'') = p(x)p(x' \mid x)p(x'' \mid x, x') = \frac{n(x)}{n} \cdot \frac{n(x, x')}{n(x)} \cdot \frac{n(x, x', x'')}{n(x, x')} p(x,x′,x′′)=p(x)p(x′∣x)p(x′′∣x,x′)=nn(x)⋅n(x)n(x,x′)⋅n(x,x′)n(x,x′,x′′) -
当序列很长时,因为文本量不够大,很可能 n ( x 1 , … , x T ) ≤ 1 n(x_1, \dots, x_T) \leq 1 n(x1,…,xT)≤1
-
使用马尔科夫假设可以缓解这个问题
-
一元语法(Unigram):假设词之间完全独立,联合概率拆分为每个词单独出现概率的乘积(一元语法实际使用的不多):
p ( x 1 , x 2 , x 3 , x 4 ) = p ( x 1 ) p ( x 2 ) p ( x 3 ) p ( x 4 ) = n ( x 1 ) n ⋅ n ( x 2 ) n ⋅ n ( x 3 ) n ⋅ n ( x 4 ) n p(x_1, x_2, x_3, x_4) = p(x_1)p(x_2)p(x_3)p(x_4) = \frac{n(x_1)}{n} \cdot \frac{n(x_2)}{n} \cdot \frac{n(x_3)}{n} \cdot \frac{n(x_4)}{n} p(x1,x2,x3,x4)=p(x1)p(x2)p(x3)p(x4)=nn(x1)⋅nn(x2)⋅nn(x3)⋅nn(x4) -
二元语法(Bigram):假设当前词仅依赖前1个词,联合概率拆分为“首词概率”与“后续词依赖前1个词的条件概率”的乘积:
p ( x 1 , x 2 , x 3 , x 4 ) = p ( x 1 ) p ( x 2 ∣ x 1 ) p ( x 3 ∣ x 2 ) p ( x 4 ∣ x 3 ) = n ( x 1 ) n ⋅ n ( x 1 , x 2 ) n ( x 1 ) ⋅ n ( x 2 , x 3 ) n ( x 2 ) ⋅ n ( x 3 , x 4 ) n ( x 3 ) p(x_1, x_2, x_3, x_4) = p(x_1)p(x_2 \mid x_1)p(x_3 \mid x_2)p(x_4 \mid x_3) = \frac{n(x_1)}{n} \cdot \frac{n(x_1, x_2)}{n(x_1)} \cdot \frac{n(x_2, x_3)}{n(x_2)} \cdot \frac{n(x_3, x_4)}{n(x_3)} p(x1,x2,x3,x4)=p(x1)p(x2∣x1)p(x3∣x2)p(x4∣x3)=nn(x1)⋅n(x1)n(x1,x2)⋅n(x2)n(x2,x3)⋅n(x3)n(x3,x4) -
三元语法(Trigram):假设当前词仅依赖前2个词,联合概率拆分为“首词概率”“次词依赖首词的概率”与“后续词依赖前2个词的条件概率”的乘积:
p ( x 1 , x 2 , x 3 , x 4 ) = p ( x 1 ) p ( x 2 ∣ x 1 ) p ( x 3 ∣ x 1 , x 2 ) p ( x 4 ∣ x 2 , x 3 ) = n ( x 1 ) n ⋅ n ( x 1 , x 2 ) n ( x 1 ) ⋅ n ( x 1 , x 2 , x 3 ) n ( x 1 , x 2 ) ⋅ n ( x 2 , x 3 , x 4 ) n ( x 2 , x 3 ) p(x_1, x_2, x_3, x_4) = p(x_1)p(x_2 \mid x_1)p(x_3 \mid x_1, x_2)p(x_4 \mid x_2, x_3) = \frac{n(x_1)}{n} \cdot \frac{n(x_1, x_2)}{n(x_1)} \cdot \frac{n(x_1, x_2, x_3)}{n(x_1, x_2)} \cdot \frac{n(x_2, x_3, x_4)}{n(x_2, x_3)} p(x1,x2,x3,x4)=p(x1)p(x2∣x1)p(x3∣x1,x2)p(x4∣x2,x3)=nn(x1)⋅n(x1)n(x1,x2)⋅n(x1,x2)n(x1,x2,x3)⋅n(x2,x3)n(x2,x3,x4)
-
代码实现:
import random
import torch
from d2l import torch as d2l
# 单词语料库构建与词频统计
tokens = d2l.tokenize(d2l.read_time_machine())
corpus = [token for line in tokens for token in line]
vocab = d2l.Vocab(corpus)
vocab.token_freqs[:10]
# 绘制词频的双对数图
# 最流行的词被称为*停用词*(通常对文本的主题内容贡献不大)
freqs = [freq for token, freq in vocab.token_freqs]
d2l.plot(freqs, xlabel='token: x', ylabel='frequency: n(x)', xscale='log', yscale='log')
其他的词元组合:
# 二元语法
bigram_tokens = [pair for pair in zip(corpus[:-1], corpus[1:])]
bigram_vocab = d2l.Vocab(bigram_tokens)
bigram_vocab.token_freqs[:10]
# 三元语法
trigram_tokens = [triple for triple in zip(
corpus[:-2], corpus[1:-1], corpus[2:])]
trigram_vocab = d2l.Vocab(trigram_tokens)
trigram_vocab.token_freqs[:10]
# 直观对比三种模型不同词元出现的频率
bigram_freqs = [freq for token, freq in bigram_vocab.token_freqs]
trigram_freqs = [freq for token, freq in trigram_vocab.token_freqs]
d2l.plot([freqs, bigram_freqs, trigram_freqs], xlabel='token: x',
ylabel='frequency: n(x)', xscale='log', yscale='log',
legend=['unigram', 'bigram', 'trigram'])

随机生成一个小批量数据的特征和标签以供读取:
def seq_data_iter_random(corpus, batch_size, num_steps):
"""使用随机抽样生成一个小批量子序列"""
# 随机偏移语料库,避免固定起始位置
corpus = corpus[random.randint(0, num_steps - 1):]
# 计算可提取的完整子序列数量(每个子序列长度为num_steps)
num_subseqs = (len(corpus) - 1) // num_steps
# 生成所有子序列的起始索引(不重叠)
initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
# 打乱起始索引,实现随机抽样
random.shuffle(initial_indices)
def data(pos):
return corpus[pos: pos + num_steps]
# 计算总批次数
num_batches = num_subseqs // batch_size
# 按批次生成数据
for i in range(0, batch_size * num_batches, batch_size):
# 当前批次的起始索引
initial_indices_per_batch = initial_indices[i: i + batch_size]
# 输入序列X:每个元素是长度为num_steps的子序列
X = [data(j) for j in initial_indices_per_batch]
# 目标序列Y:每个元素是X的下一时间步(滞后1步)
Y = [data(j + 1) for j in initial_indices_per_batch]
# 返回当前批次的X和Y(转为张量)
yield torch.tensor(X), torch.tensor(Y)
如果想保证两个相邻的小批量中的子序列在原始序列上也是相邻的,可以用:
def seq_data_iter_sequential(corpus, batch_size, num_steps):
"""使用顺序分区生成一个小批量子序列"""
# 随机偏移起始位置,避免固定模式
offset = random.randint(0, num_steps)
# 计算可用token总数(确保能被batch_size整除,便于后续reshape)
num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
# 提取输入序列X和目标序列Y(Y比X滞后1步)
Xs = torch.tensor(corpus[offset: offset + num_tokens])
Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
# 重塑为batch_size行的并行长序列(每行一个子序列)
Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
# 计算总批次数(每个并行序列可生成多少个num_steps长度的子序列)
num_batches = Xs.shape[1] // num_steps
# 按顺序截取子序列,生成批量数据
for i in range(0, num_steps * num_batches, num_steps):
X = Xs[:, i: i + num_steps] # 输入:每个并行序列的[i, i+num_steps)片段
Y = Ys[:, i: i + num_steps] # 目标:对应下一时间步的片段
yield X, Y
包装采样函数和定义函数load_data_time_machine:
class SeqDataLoader:
"""加载序列数据的迭代器"""
def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
if use_random_iter:
self.data_iter_fn = d2l.seq_data_iter_random
else:
self.data_iter_fn = d2l.seq_data_iter_sequential
self.corpus, self.vocab = d2l.load_corpus_time_machine(max_tokens)
self.batch_size, self.num_steps = batch_size, num_steps
def __iter__(self):
return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)
def load_data_time_machine(batch_size, num_steps,
use_random_iter=False, max_tokens=10000):
"""返回时光机器数据集的迭代器和词表"""
data_iter = SeqDataLoader(
batch_size, num_steps, use_random_iter, max_tokens)
return data_iter, data_iter.vocab
循环神经网络 RNN

- 更新隐藏状态: h t = ϕ ( W h h h t − 1 + W h x x t − 1 + b h ) \mathbf{h}_t = \phi(\mathbf{W}_{hh}\mathbf{h}_{t-1} + \mathbf{W}_{hx}\mathbf{x}_{t-1} + \mathbf{b}_h) ht=ϕ(Whhht−1+Whxxt−1+bh)
- 输出: o t = ϕ ( W h o h t + b o ) \mathbf{o}_t = \phi(\mathbf{W}_{ho}\mathbf{h}_t + \mathbf{b}_o) ot=ϕ(Whoht+bo)

困惑度(perplexity)
-
衡量一个语言模型的好坏可以用平均交叉熵:
π = 1 n ∑ i = 1 n − log p ( x t ∣ x t − 1 , … ) \pi = \frac{1}{n} \sum_{i=1}^{n} -\log p(x_t | x_{t-1}, \dots) π=n1i=1∑n−logp(xt∣xt−1,…)
其中, p 是语言模型的预测概率, x t x_t xt 是真实词。 -
历史原因,NLP 领域使用困惑度 exp ( π ) \exp(\pi) exp(π) 来衡量模型表现,它表示“平均每次预测时的可选选项数量”:
- 困惑度为 1 表示模型完美(每次都能精准预测正确词);
- 困惑度为无穷大表示模型最差(完全无法预测正确词)。
-
二者都是 “ 数值越小,模型预测越准 ” 。
梯度裁剪
- 迭代中计算这 T 个时间步上的梯度,在反向传播过程中产生长度为 O(T) 的矩阵乘法链,导致数值不稳定。
- 梯度裁剪能有效预防梯度爆炸:
- 如果梯度长度超过 θ \theta θ ,那么将其裁剪回长度 θ \theta θ ,公式为:
g ← min ( 1 , θ ∥ g ∥ ) g \mathbf{g} \leftarrow \min\left(1, \frac{\theta}{\|\mathbf{g}\|}\right) \mathbf{g} g←min(1,∥g∥θ)g
- 如果梯度长度超过 θ \theta θ ,那么将其裁剪回长度 θ \theta θ ,公式为:
从零开始实现
%matplotlib inline
import math
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
# 初始化循环神经网络模型参数
def get_params(vocab_size, num_hiddens, device):
# 输入和输出维度均等于词汇表大小(输入为独热编码,输出为下一个词的概率分布)
num_inputs = num_outputs = vocab_size
# 定义参数初始化函数:生成小随机数(标准正态分布缩放0.01倍)
def normal(shape):
return torch.randn(size=shape, device=device) * 0.01
W_xh = normal((num_inputs, num_hiddens))
W_hh = normal((num_hiddens, num_hiddens))
b_h = torch.zeros(num_hiddens, device=device)
W_hq = normal((num_hiddens, num_outputs))
b_q = torch.zeros(num_outputs, device=device)
params = [W_xh, W_hh, b_h, W_hq, b_q]
for param in params:
param.requires_grad_(True)
return params
# 初始化隐藏状态
def init_rnn_state(batch_size, num_hiddens, device):
return (torch.zeros((batch_size, num_hiddens), device=device), )
# RNN前向计算:按时间步迭代处理序列
def rnn(inputs, state, params):
# 解包参数:输入→隐藏、隐藏→隐藏、隐藏偏置、隐藏→输出、输出偏置
W_xh, W_hh, b_h, W_hq, b_q = params
H, = state # 解包初始隐藏状态
outputs = [] # 存储每个时间步的输出
for X in inputs:
H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
Y = torch.mm(H, W_hq) + b_q
outputs.append(Y)
# 拼接所有时间步的输出,返回输出和最终隐藏状态
return torch.cat(outputs, dim=0), (H,)
# 创建一个类来包装一下这些函数
class RNNModelScratch:
"""从零实现的循环神经网络模型类,封装RNN核心逻辑并支持灵活扩展"""
def __init__(self, vocab_size, num_hiddens, device, get_params, init_state, forward_fn):
# 初始化模型基本配置:词汇表大小、隐藏层维度、计算设备
self.vocab_size, self.num_hiddens, self.device = vocab_size, num_hiddens, device
# 注入核心函数:参数生成、状态初始化、前向计算(支持替换为LSTM等变种)
self.params = get_params(vocab_size, num_hiddens, device) # 模型参数(权重/偏置)
self.init_state, self.forward_fn = init_state, forward_fn # 状态初始化和前向计算函数
def __call__(self, X, state):
"""前向计算接口:输入序列→输出+新状态"""
# 输入转为独热编码(适配RNN输入格式),调用注入的前向函数计算
X = F.one_hot(X.T, self.vocab_size).type(torch.float32) # 转置+独热编码
return self.forward_fn(X, state, self.params) # 返回输出和更新后的状态
def begin_state(self, batch_size, device):
"""初始化隐藏状态(如全0张量),封装状态初始化逻辑"""
return self.init_state(batch_size, self.num_hiddens, device)
def predict_ch8(prefix, num_preds, net, vocab, device):
"""在prefix后面生成新字符"""
# 初始化隐藏状态:批量大小为1(一次生成一个序列)
state = net.begin_state(batch_size=1, device=device)
# 处理前缀的第一个字符:转换为索引并存入输出列表
# outputs用于记录所有字符(前缀+生成字符)的索引
outputs = [vocab[prefix[0]]] # vocab[char]获取字符对应的整数索引
# 定义输入函数:获取上一个字符的索引作为下一次输入
get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
# 逐字处理前缀剩余字符,更新隐藏状态(积累上下文信息)
for y in prefix[1:]: # 遍历前缀中除第一个字符外的所有字符
_, state = net(get_input(), state)
outputs.append(vocab[y])
# 生成指定数量的新字符
for _ in range(num_preds):
y, state = net(get_input(), state)
# 取概率最大的字符索引(贪心解码策略),加入输出列表
# argmax(dim=1):在词汇表维度取最大值索引
outputs.append(int(y.argmax(dim=1).reshape(1)))
# 将所有索引转换为字符,拼接成完整字符串返回
# vocab.idx_to_token[i]:通过索引获取对应的字符
return ''.join([vocab.idx_to_token[i] for i in outputs])
梯度裁剪实现:
g ← min ( 1 , θ ∥ g ∥ ) g \mathbf{g} \leftarrow \min\left(1, \frac{\theta}{\|\mathbf{g}\|}\right) \mathbf{g} g←min(1,∥g∥θ)g
def grad_clipping(net, theta):
"""裁剪梯度"""
if isinstance(net, nn.Module):
params = [p for p in net.parameters() if p.requires_grad]
else:
params = net.params
norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
if norm > theta:
for param in params:
param.grad[:] *= theta / norm
训练:
def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):
"""训练网络一个迭代周期"""
state, timer = None, d2l.Timer() # 初始化隐藏状态和计时器
metric = d2l.Accumulator(2) # 累计[总损失, 总样本数]
for X, Y in train_iter:
# 初始化或重置隐藏状态
if state is None or use_random_iter:
state = net.begin_state(batch_size=X.shape[0], device=device)
else:
# 顺序迭代时截断梯度(避免历史依赖导致内存爆炸)
if isinstance(net, torch.nn.Module) and not isinstance(state, tuple):
state.detach_()
else:
for s in state:
s.detach_()
# 处理输入和目标
y = Y.T.reshape(-1) # 调整目标形状以匹配输出
X, y = X.to(device), y.to(device)
# 前向计算与损失
y_hat, state = net(X, state)
l = loss(y_hat, y.long()).mean()
# 反向传播与参数更新
if isinstance(updater, torch.optim.Optimizer):
updater.zero_grad()
l.backward()
grad_clipping(net, 1) # 梯度裁剪
updater.step()
else:
l.backward()
grad_clipping(net, 1)
updater(batch_size=1)
metric.add(l * y.numel(), y.numel()) # 累计指标
# 返回困惑度(exp(平均损失))和训练速度
return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()
def train_ch8(net, train_iter, vocab, lr, num_epochs, device,
use_random_iter=False):
"""
循环神经网络(RNN)的完整训练控制函数,负责多轮训练、性能监控和结果展示
参数:
net: 待训练的RNN模型(可是nn.Module子类或自定义模型)
train_iter: 训练数据迭代器,生成(输入序列X, 目标序列Y)批量
vocab: 词汇表,包含字符与索引的映射关系
lr: 学习率,控制参数更新幅度
num_epochs: 总训练轮数(完整遍历训练数据的次数)
device: 计算设备(CPU或GPU)
use_random_iter: 是否使用随机迭代模式(False为顺序迭代,保持序列连续性)
"""
loss = nn.CrossEntropyLoss()
animator = d2l.Animator(
xlabel='epoch', # x轴标签:训练轮数
ylabel='perplexity', # y轴标签:困惑度(越低模型越好)
legend=['train'], # 图例:训练集
xlim=[10, num_epochs] # x轴范围:从第10轮到总轮数
)
if isinstance(net, nn.Module):
# 若为PyTorch原生nn.Module模型(如nn.RNN),使用内置SGD优化器
updater = torch.optim.SGD(net.parameters(), lr)
else:
# 若为自定义模型(如RNNModelScratch),使用自定义SGD更新函数
updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size)
predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device)
for epoch in range(num_epochs):
ppl, speed = train_epoch_ch8(
net, train_iter, loss, updater, device, use_random_iter
)
# 每10轮输出一次生成结果并更新可视化图表
if (epoch + 1) % 10 == 0:
# 打印以'time traveller'为前缀的生成文本,观察模型生成质量
print(predict('time traveller'))
animator.add(epoch + 1, [ppl])
# 打印最终困惑度、训练速度和使用的设备
print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')
# 打印模型对两个不同前缀的最终生成结果,评估模型泛化能力
print(predict('time traveller'))
print(predict('traveller'))
简洁实现
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
# 定义模型
num_hiddens = 256
rnn_layer = nn.RNN(len(vocab), num_hiddens)
state = torch.zeros((1, batch_size, num_hiddens))
# 注意:模块式 nn.RNN 只负责 “提取序列的隐藏特征”,而 “特征如何映射到任务输出”(如词汇表)需用户通过 Linear 层自定义
class RNNModel(nn.Module):
"""循环神经网络模型(支持基础RNN、LSTM,单向/双向模式)"""
def __init__(self, rnn_layer, vocab_size, **kwargs):
super(RNNModel, self).__init__(** kwargs)
self.rnn = rnn_layer # 内置RNN层(如nn.RNN、nn.LSTM)
self.vocab_size = vocab_size # 词汇表大小(输出类别数)
self.num_hiddens = self.rnn.hidden_size # 隐藏层维度
# 根据RNN是否双向,初始化线性层(将隐藏状态映射到词汇表)
if not self.rnn.bidirectional:
self.num_directions = 1 # 单向RNN
self.linear = nn.Linear(self.num_hiddens, self.vocab_size)
else:
self.num_directions = 2 # 双向RNN(需拼接两个方向的隐藏状态)
self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)
def forward(self, inputs, state):
"""前向计算:输入序列→预测输出+更新后的状态"""
# 输入处理:字符索引→独热编码(适配RNN输入格式)
X = F.one_hot(inputs.T.long(), self.vocab_size)
X = X.to(torch.float32) # 转换为浮点型(RNN层要求的输入类型)
# RNN特征提取:输入序列→所有时间步的隐藏状态+最终状态
Y, state = self.rnn(X, state) # Y形状:(num_steps, batch_size, 隐藏层维度×方向数)
# 输出预测:隐藏状态→词汇表空间(预测下一个字符)
# 合并时间步和批量维度:(num_steps×batch_size, 隐藏层维度×方向数)
output = self.linear(Y.reshape((-1, Y.shape[-1])))
return output, state # output形状:(num_steps×batch_size, vocab_size)
def begin_state(self, device, batch_size=1):
"""初始化隐藏状态(支持RNN和LSTM)"""
# 隐藏状态形状:(方向数×层数, batch_size, 隐藏层维度)
state_shape = (self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens)
if not isinstance(self.rnn, nn.LSTM):
# 基础RNN:仅需一个隐藏状态张量
return torch.zeros(state_shape, device=device)
else:
# LSTM:需要两个状态(隐藏状态h和细胞状态c)
return (torch.zeros(state_shape, device=device),
torch.zeros(state_shape, device=device))
训练:
device = d2l.try_gpu()
net = RNNModel(rnn_layer, vocab_size=len(vocab))
net = net.to(device)
d2l.predict_ch8('time traveller', 10, net, vocab, device)
num_epochs, lr = 500, 1
d2l.train_ch8(net, train_iter, vocab, lr, num_epochs, device)
门控循环单元 GRU
关注一个序列不是每个观察值都是同等重要
想只记住相关的观察需要:能关注的机制(更新门) and 能遗忘的机制(重置门)
门:
更新门 Z t Z_t Zt、重置门 R t R_t Rt 与隐藏状态 H t H_t Ht 的维度完全一致
候选隐状态:
隐状态:
GRU代码实现:
从零开始实现:
import torch
from torch import nn
from d2l import torch as d2l
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
# 初始化模型参数
def get_params(vocab_size, num_hiddens, device):
num_inputs = num_outputs = vocab_size
def normal(shape):
return torch.randn(size=shape, device=device)*0.01
def three():
return (normal((num_inputs, num_hiddens)),
normal((num_hiddens, num_hiddens)),
torch.zeros(num_hiddens, device=device))
W_xz, W_hz, b_z = three() # 更新门参数
W_xr, W_hr, b_r = three() # 重置门参数
W_xh, W_hh, b_h = three() # 候隐藏状态参数
# 输出层参数
W_hq = normal((num_hiddens, num_outputs))
b_q = torch.zeros(num_outputs, device=device)
# 附加梯度
params = [W_xz, W_hz, b_z, W_xr, W_hr, b_r, W_xh, W_hh, b_h, W_hq, b_q]
for param in params:
param.requires_grad_(True)
return params
# 定义隐状态的初始化函数
def init_gru_state(batch_size, num_hiddens, device):
return (torch.zeros((batch_size, num_hiddens), device=device), )
# 定义GRU模型
def gru(inputs, state, params):
W_xz, W_hz, b_z, W_xr, W_hr, b_r, W_xh, W_hh, b_h, W_hq, b_q = params
H, = state
outputs = []
for X in inputs:
Z = torch.sigmoid((X @ W_xz) + (H @ W_hz) + b_z)
R = torch.sigmoid((X @ W_xr) + (H @ W_hr) + b_r)
H_tilda = torch.tanh((X @ W_xh) + ((R * H) @ W_hh) + b_h)
H = Z * H + (1 - Z) * H_tilda
Y = H @ W_hq + b_q
outputs.append(Y)
return torch.cat(outputs, dim=0), (H,)
# 训练
vocab_size, num_hiddens, device = len(vocab), 256, d2l.try_gpu()
num_epochs, lr = 500, 1
model = d2l.RNNModelScratch(len(vocab), num_hiddens, device, get_params,
init_gru_state, gru)
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)
简洁实现:
num_inputs = vocab_size
gru_layer = nn.GRU(num_inputs, num_hiddens)
model = d2l.RNNModel(gru_layer, len(vocab))
model = model.to(device)
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)
长短期记忆网络 LSTM
- 忘记门:将值朝0减少
- 输入门:决定不是忽略掉输入数据
- 输出门:决定是不是使用隐状态

候选记忆单元:
记忆单元
代码实现(和之前的挺相似的):
import torch
from torch import nn
from d2l import torch as d2l
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
# 初始化模型参数
def get_lstm_params(vocab_size, num_hiddens, device):
num_inputs = num_outputs = vocab_size
def normal(shape):
return torch.randn(size=shape, device=device)*0.01
def three():
return (normal((num_inputs, num_hiddens)),
normal((num_hiddens, num_hiddens)),
torch.zeros(num_hiddens, device=device))
W_xi, W_hi, b_i = three()
W_xf, W_hf, b_f = three()
W_xo, W_ho, b_o = three()
W_xc, W_hc, b_c = three()
W_hq = normal((num_hiddens, num_outputs))
b_q = torch.zeros(num_outputs, device=device)
params = [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc,
b_c, W_hq, b_q]
for param in params:
param.requires_grad_(True)
return params
# 定义模型
def init_lstm_state(batch_size, num_hiddens, device):
return (torch.zeros((batch_size, num_hiddens), device=device),
torch.zeros((batch_size, num_hiddens), device=device))
def lstm(inputs, state, params):
[W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c,
W_hq, b_q] = params
(H, C) = state
outputs = []
for X in inputs:
I = torch.sigmoid((X @ W_xi) + (H @ W_hi) + b_i)
F = torch.sigmoid((X @ W_xf) + (H @ W_hf) + b_f)
O = torch.sigmoid((X @ W_xo) + (H @ W_ho) + b_o)
C_tilda = torch.tanh((X @ W_xc) + (H @ W_hc) + b_c)
C = F * C + I * C_tilda
H = O * torch.tanh(C)
Y = (H @ W_hq) + b_q
outputs.append(Y)
return torch.cat(outputs, dim=0), (H, C)
# 训练
vocab_size, num_hiddens, device = len(vocab), 256, d2l.try_gpu()
num_epochs, lr = 500, 1
model = d2l.RNNModelScratch(len(vocab), num_hiddens, device, get_lstm_params,
init_lstm_state, lstm)
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)
简洁实现:
num_inputs = vocab_size
lstm_layer = nn.LSTM(num_inputs, num_hiddens)
model = d2l.RNNModel(lstm_layer, len(vocab))
model = model.to(device)
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)
深度循环神经网络
思想就是多加几个隐藏层
代码实现:
import torch
from torch import nn
from d2l import torch as d2l
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
vocab_size, num_hiddens, num_layers = len(vocab), 256, 2
num_inputs = vocab_size
device = d2l.try_gpu()
lstm_layer = nn.LSTM(num_inputs, num_hiddens, num_layers)
model = d2l.RNNModel(lstm_layer, len(vocab))
model = model.to(device)
num_epochs, lr = 500, 2
d2l.train_ch8(model, train_iter, vocab, lr*1.0, num_epochs, device)
双向循环神经网络
未来很重要:
- 取决于过去和未来的上下文,可以填很不一样的词
- 目前为止RNN只看过去
- 在填空的时候,我们也可以看未来
双向RNN
- 一个前向RNN隐层
- 一个反向RNN隐层
- 合并两个隐状态得到输出

双向RNN 不适合做推理
主要的作用是对序列做特征提取、填空,而不是预测未来
代码实现:
(这是错误应用,这类任务中 “未来信息” 在推理时不可得,而双向 RNN 训练时会错误利用未来信息,导致应用场景不匹配。)
import torch
from torch import nn
from d2l import torch as d2l
batch_size, num_steps, device = 32, 35, d2l.try_gpu()
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
vocab_size, num_hiddens, num_layers = len(vocab), 256, 2
num_inputs = vocab_size
lstm_layer = nn.LSTM(num_inputs, num_hiddens, num_layers, bidirectional=True)
model = d2l.RNNModel(lstm_layer, len(vocab))
model = model.to(device)
num_epochs, lr = 500, 1
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)
机器翻译与数据集
import os
import torch
from d2l import torch as d2l
下载预处理数据集:
d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip',
'94646ad1522d915e7b0f9296181140edcf86a4f5')
def read_data_nmt():
"""载入“英语-法语”数据集"""
data_dir = d2l.download_extract('fra-eng') # 下载并解压数据集
with open(os.path.join(data_dir, 'fra.txt'), 'r',
encoding='utf-8') as f:
return f.read()
raw_text = read_data_nmt()
print(raw_text[:75])
预处理:
def preprocess_nmt(text):
"""预处理“英语-法语”数据集"""
def no_space(char, prev_char):
# 判断标点前是否需要加空格:若字符是`,.!?`且前一个字符不是空格,则需要加
return char in set(',.!?') and prev_char != ' '
# 替换特殊空格、转小写
text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower()
# 规范标点格式:在需要的标点前加空格
out = [' ' + char if i > 0 and no_space(char, text[i-1]) else char
for i, char in enumerate(text)]
return ''.join(out)
text = preprocess_nmt(raw_text)
print(text[:80])
词元化:
def tokenize_nmt(text, num_examples=None):
"""词元化“英语-法语”数据数据集"""
source, target = [], [] # 初始化源语言(英语)和目标语言(法语)的词元列表
# 按行分割文本(每行是一对翻译句),并遍历每一行
for i, line in enumerate(text.split('\n')):
# 若指定了最大样本数,且当前行数超过该数量,则停止处理
if num_examples and i > num_examples:
break
# 按制表符\t分割当前行,得到英语部分和法语部分
parts = line.split('\t')
# 确保分割后有且仅有两部分(有效翻译对)
if len(parts) == 2:
# 英语句子按空格分割为词元(如"go ." → ["go", "."]),加入source列表
source.append(parts[0].split(' '))
# 法语句子按空格分割为词元(如"va !" → ["va", "!"]),加入target列表
target.append(parts[1].split(' '))
return source, target
source, target = tokenize_nmt(text)
print("英语词元序列前6个样本:", source[:6])
print("法语词元序列前6个样本:", target[:6])
绘制每个文本序列所包含的词元数量的直方图,如图:
def show_list_len_pair_hist(legend, xlabel, ylabel, xlist, ylist):
"""绘制列表长度对的直方图"""
d2l.set_figsize() # 设置图表大小(调用d2l库工具函数)
# 统计并绘制“源语言/目标语言句子的词元数量”的直方图
_, _, patches = d2l.plt.hist(
[[len(l) for l in xlist], [len(l) for l in ylist]])
d2l.plt.xlabel(xlabel) # 设置x轴标签
d2l.plt.ylabel(ylabel) # 设置y轴标签
# 为“目标语言”的直方图柱形添加斜线纹理(方便视觉区分)
for patch in patches[1].patches:
patch.set_hatch('/')
d2l.plt.legend(legend) # 设置图例(区分source和target)
show_list_len_pair_hist(['source', 'target'], '# tokens per sequence',
'count', source, target);

词表:
src_vocab = d2l.Vocab(source, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
len(src_vocab)
序列样本都有一个固定的长度,可以通过截断或填充文本序列来实现:
def truncate_pad(line, num_steps, padding_token):
"""截断或填充文本序列"""
if len(line) > num_steps:
return line[:num_steps]
return line + [padding_token] * (num_steps - len(line))
truncate_pad(src_vocab[source[0]], 10, src_vocab['<pad>'])
# [47, 4, 1, 1, 1, 1, 1, 1, 1, 1]
转换成小批量数据集用于训练:
def build_array_nmt(lines, vocab, num_steps):
"""将文本序列转为模型输入的批量数据"""
# 词元转索引(词汇表映射)
lines = [vocab[l] for l in lines]
# 加句尾标记<eos>
lines = [l + [vocab['<eos>']] for l in lines]
# 统一长度(截断/填充)并转张量
array = torch.tensor([truncate_pad(l, num_steps, vocab['<pad>']) for l in lines])
# 计算有效长度(非<pad>元素数量)
valid_len = (array != vocab['<pad>']).type(torch.int32).sum(1)
return array, valid_len
训练模型:
def load_data_nmt(batch_size, num_steps, num_examples=600):
"""加载神经机器翻译数据集,返回批量迭代器和词汇表"""
# 读取原始文本并预处理(规范格式、大小写等)
text = preprocess_nmt(read_data_nmt())
# 词元化:将文本拆分为源语言(如英语)和目标语言(如法语)的词元序列
source, target = tokenize_nmt(text, num_examples)
# 构建源语言和目标语言的词汇表(词元→索引映射)
# 过滤低频词(出现次数≥2),添加填充、句首、句尾标记
src_vocab = d2l.Vocab(source, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
tgt_vocab = d2l.Vocab(target, min_freq=2,
reserved_tokens=['<pad>', '<bos>', '<eos>'])
# 将词元序列转为张量数组,统一长度并计算有效长度(排除填充部分)
src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps)
tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps)
# 打包所有数据数组,生成批量迭代器
data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len)
data_iter = d2l.load_array(data_arrays, batch_size) # 按batch_size划分批量
return data_iter, src_vocab, tgt_vocab
# 读出“英语-法语”数据集中的第一个小批量数据
train_iter, src_vocab, tgt_vocab = load_data_nmt(batch_size=2, num_steps=8)
for X, X_valid_len, Y, Y_valid_len in train_iter:
print('X:', X.type(torch.int32))
print('X的有效长度:', X_valid_len)
print('Y:', Y.type(torch.int32))
print('Y的有效长度:', Y_valid_len)
break
编码器—解码器架构
编码器:将输入编程成中间表达形式(特征)
解码器:将中间表示解码成输出
可以通过CNN来理解:
重新考察RNN
编码器: 将文本表示成向量
解码器: 向量表示成输出
编码器-解码器架构
- 一个模型被分为两块:
- 编码器处理输出
- 解码器生成输出
from torch import nn
class Encoder(nn.Module):
"""编码器-解码器架构的基本编码器接口"""
def __init__(self, **kwargs):
super(Encoder, self).__init__(**kwargs)
def forward(self, X, *args):
raise NotImplementedError
class Decoder(nn.Module):
"""编码器-解码器架构的基本解码器接口"""
def __init__(self, **kwargs):
super(Decoder, self).__init__(**kwargs)
def init_state(self, enc_outputs, *args):
raise NotImplementedError
def forward(self, X, state):
raise NotImplementedError
class EncoderDecoder(nn.Module):
"""编码器-解码器架构的基类"""
def __init__(self, encoder, decoder, **kwargs):
super(EncoderDecoder, self).__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
def forward(self, enc_X, dec_X, *args):
enc_outputs = self.encoder(enc_X, *args)
dec_state = self.decoder.init_state(enc_outputs, *args)
return self.decoder(dec_X, dec_state)

(代码不理解可以配合图片理解)
序列到序列学习(seq2seq)
机器翻译:给定一个源语言的句子,自动翻译成目标语言,这两个句子可以有不同的长度
Seq2seq:
编码器是一个RNN,读取输入句子,可以是双向
解码器使用另外一个RNN来输出
细节:
编码器是没有输出的RNN
编码器最后时间步的隐状态用作解码器的初始隐状态
训练时解码器使用目标句子作为输入
衡量生成序列好坏的BLEU指标
- n-gram 精度 p n p_n pn
p n p_n pn 表示预测序列中所有 n-gram(n元组)的精度(与参考序列的匹配比例)。
例子:
参考序列(标签序列):A B C D E F
预测序列:A B B C D
- p 1 p_1 p1 = 4/5 :预测的1-gram(
A、B、B、C、D)中,4个在参考中出现。 - p 2 p_2 p2 = 3/4 :预测的2-gram(
AB、BB、BC、CD)中,3个在参考中出现。 - p 3 p_3 p3 = 1/3 :预测的3-gram(
ABB、BBC、BCD)中,1个在参考中出现。 - p 4 p_4 p4 = 0 :预测的4-gram(
ABBC、BBCD)均不在参考中出现。
- BLEU 定义
BLEU 综合多阶n-gram精度与长度惩罚,公式为:
BLEU = exp ( min ( 0 , 1 − len label len pred ) ) ⋅ ∏ n = 1 k p n 1 / 2 n \text{BLEU} = \exp\left( \min\left( 0, 1 - \frac{\text{len}_{\text{label}}}{\text{len}_{\text{pred}}} \right) \right) \cdot \prod_{n=1}^{k} p_n^{1/2^n} BLEU=exp(min(0,1−lenpredlenlabel))⋅n=1∏kpn1/2n
其中:
- len label \text{len}_{\text{label}} lenlabel:参考序列(标签)的长度;
- len pred \text{len}_{\text{pred}} lenpred:预测序列的长度;
- min ( 0 , 1 − len label len pred ) \min\left( 0, 1 - \frac{\text{len}_{\text{label}}}{\text{len}_{\text{pred}}} \right) min(0,1−lenpredlenlabel):长度惩罚项,防止生成序列过短;
- ∏ n = 1 k p n 1 / 2 n \prod_{n=1}^{k} p_n^{1/2^n} ∏n=1kpn1/2n:各阶n-gram精度的加权几何平均(n越大, 1 / 2 n 1/2^n 1/2n 越小)。
代码实现:
import collections
import math
import torch
from torch import nn
from d2l import torch as d2l
# 实现编码器和解码器
class Seq2SeqEncoder(d2l.Encoder):
"""用于序列到序列学习的循环神经网络编码器"""
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
dropout=0, **kwargs):
super(Seq2SeqEncoder, self).__init__(** kwargs)
# 词嵌入层:将离散的词索引转换为连续的低维向量
# vocab_size:源语言词汇表大小;embed_size:词向量维度
self.embedding = nn.Embedding(vocab_size, embed_size)
# 输入维度=词向量维度(embed_size);隐藏层维度=num_hiddens
self.rnn = nn.GRU(embed_size, num_hiddens, num_layers,
dropout=dropout)
def forward(self, X, *args):
# X:输入的源序列词索引,形状为 (batch_size, seq_len)
# 例如:(32, 10) 表示32个样本,每个样本是长度为10的词索引序列
# 输出形状:(batch_size, seq_len, embed_size)
X = self.embedding(X) # 例:(32, 10) → (32, 10, 128)(假设embed_size=128)
# 调整维度顺序:GRU要求输入格式为 (seq_len, batch_size, feature)
X = X.permute(1, 0, 2) # 例:(32, 10, 128) → (10, 32, 128)
# output:所有时间步的隐藏状态,形状 (seq_len, batch_size, num_hiddens)
# state:最后一个时间步的隐藏状态,形状 (num_layers, batch_size, num_hiddens)
output, state = self.rnn(X)
return output, state
class Seq2SeqDecoder(d2l.Decoder):
"""用于序列到序列学习的循环神经网络解码器"""
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
dropout=0, **kwargs):
super(Seq2SeqDecoder, self).__init__(** kwargs)
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU(embed_size + num_hiddens, num_hiddens, num_layers,
dropout=dropout)
self.dense = nn.Linear(num_hiddens, vocab_size)
def init_state(self, enc_outputs, *args):
# 用编码器的输出初始化解码器状态
# enc_outputs[1]是编码器的最终隐藏状态(形状:num_layers, batch_size, num_hiddens)
return enc_outputs[1]
def forward(self, X, state):
# X:目标序列的前缀词索引,形状 (batch_size, seq_len_dec)
# 例如:(32, 8) 表示32个样本,每个样本是长度为8的目标前缀
# 输出形状:(batch_size, seq_len_dec, embed_size)
X = self.embedding(X).permute(1, 0, 2)
# 生成上下文信息:将编码器最后一层的状态复制到每个时间步
# state[-1]:取编码器最后一层的隐藏状态(形状:batch_size, num_hiddens)
# repeat(X.shape[0], 1, 1):复制seq_len_dec次,让每个解码步骤都能参考源序列
# 输出形状:(seq_len_dec, batch_size, num_hiddens)
context = state[-1].repeat(X.shape[0], 1, 1) # 例:(32, 256) → (8, 32, 256)(假设num_hiddens=256)
# 拼接目标词向量和上下文:让GRU同时关注“已生成的词”和“源序列语义”
# 输出形状:(seq_len_dec, batch_size, embed_size + num_hiddens)
X_and_context = torch.cat((X, context), 2) # 例:(8, 32, 128+256) = (8, 32, 384)
output, state = self.rnn(X_and_context, state)
# 先通过全连接层,再调整维度为(batch_size, seq_len_dec, vocab_size)
output = self.dense(output).permute(1, 0, 2) # 例:(8, 32, 256) → (32, 8, vocab_size)
return output, state
def sequence_mask(X, valid_len, value=0):
"""在序列中屏蔽不相关的项"""
maxlen = X.size(1)
mask = torch.arange((maxlen), dtype=torch.float32,
device=X.device)[None, :] < valid_len[:, None]
X[~mask] = value
return X
class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):
"""带遮蔽的softmax交叉熵损失函数"""
def forward(self, pred, label, valid_len):
# 创建与label同形状的权重张量(初始全1)
weights = torch.ones_like(label) # 形状:(batch_size, seq_len)
weights = sequence_mask(weights, valid_len) # 形状不变,无效位置变0
# 设置交叉熵损失的计算模式:不自动求平均/求和(保留每个位置的损失)
self.reduction = 'none' # 父类nn.CrossEntropyLoss的参数
# 计算原始交叉熵损失(未加权)
# pred形状通常是(batch_size, seq_len, vocab_size),需要转置为(batch_size, vocab_size, seq_len)
# 因为nn.CrossEntropyLoss要求输入形状为(batch_size, num_classes, seq_len)
unweighted_loss = super(MaskedSoftmaxCELoss, self).forward(
pred.permute(0, 2, 1), label) # 形状:(batch_size, seq_len)
# 用权重屏蔽无效位置的损失(无效位置损失 *= 0),再求每个样本的平均损失
weighted_loss = (unweighted_loss * weights).mean(dim=1) # 形状:(batch_size,)
return weighted_loss
训练:
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
"""训练序列到序列模型"""
# 参数初始化
def xavier_init_weights(m):
# 对线性层权重进行Xavier初始化
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight)
# 对GRU层的权重进行Xavier初始化
if type(m) == nn.GRU:
# 遍历GRU的所有参数,只初始化权重(忽略偏置)
for param in m._flat_weights_names:
if "weight" in param:
nn.init.xavier_uniform_(m._parameters[param])
net.apply(xavier_init_weights)
net.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
loss = MaskedSoftmaxCELoss()
net.train()
animator = d2l.Animator(xlabel='epoch', ylabel='loss',
xlim=[10, num_epochs])
for epoch in range(num_epochs):
timer = d2l.Timer()
metric = d2l.Accumulator(2)
for batch in data_iter:
optimizer.zero_grad()
X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]
# 构造解码器输入(Teacher Forcing策略)
# 生成句首标记<bos>:每个样本前添加一个<bos>,形状为(batch_size, 1)
bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],
device=device).reshape(-1, 1)
# 解码器输入 = <bos> + 目标序列的前n-1个词(因为要预测第n个词)
# 形状为(batch_size, tgt_seq_len),与目标序列长度一致
dec_input = torch.cat([bos, Y[:, :-1]], 1)
Y_hat, _ = net(X, dec_input, X_valid_len)
l = loss(Y_hat, Y, Y_valid_len) # 形状:(batch_size,),每个样本的平均损失
l.sum().backward() # 损失求和后反向传播(计算梯度)
d2l.grad_clipping(net, 1) # 梯度裁剪:限制梯度范数≤1,防止梯度爆炸
num_tokens = Y_valid_len.sum() # 当前batch的有效token总数(用于计算平均损失)
optimizer.step() # 根据梯度更新模型参数
# 累计损失和有效token数(不记录梯度,节省内存)
with torch.no_grad():
metric.add(l.sum(), num_tokens)
if (epoch + 1) % 10 == 0:
animator.add(epoch + 1, (metric[0] / metric[1],))
print(f'最终损失 {metric[0] / metric[1]:.3f}, '
f'训练速度 {metric[1] / timer.stop():.1f} tokens/sec '
f'(设备:{str(device)})')
embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1
batch_size, num_steps = 64, 10
lr, num_epochs, device = 0.005, 300, d2l.try_gpu()
train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)
encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers,
dropout)
decoder = Seq2SeqDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers,
dropout)
net = d2l.EncoderDecoder(encoder, decoder)
train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
预测:
def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,
device, save_attention_weights=False):
"""
序列到序列(Seq2Seq)模型的推理函数:根据源句子生成目标句子
参数说明:
- net: 训练好的Seq2Seq模型(包含编码器和解码器)
- src_sentence: 输入的源语言句子(如英文句子字符串)
- src_vocab: 源语言词汇表(用于将源句子转换为token索引)
- tgt_vocab: 目标语言词汇表(用于将生成的token索引转换为目标词)
- num_steps: 最大生成长度(防止无限循环生成)
- device: 运行设备(GPU/CPU)
- save_attention_weights: 是否保存注意力权重(用于可视化)
返回值:
- 生成的目标语言句子(字符串)
- 注意力权重序列(可选,用于可视化)
"""
net.eval() # 确保推理时模型行为稳定,结果可复现
# 分词→转为小写→转换为token索引→添加句尾标记<eos>
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [src_vocab['<eos>']]
# 记录源句子的有效长度(用于编码器屏蔽无效的Padding项)
enc_valid_len = torch.tensor([len(src_tokens)], device=device)
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
# 转换为张量并增加batch维度(模型要求输入形状为(batch_size, seq_len),此处batch_size=1)
enc_X = torch.unsqueeze(
torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0) # 形状:(1, num_steps)
# 编码器处理源句子,生成语义编码
enc_outputs = net.encoder(enc_X, enc_valid_len)
# 初始化解码器状态
dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
# 解码器初始输入:句首标记<bos>(目标序列的起始信号)
# 形状:(1, 1) → (batch_size=1, seq_len=1),每次只输入一个词
dec_X = torch.unsqueeze(torch.tensor(
[tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0)
# 初始化结果存储:生成的目标序列token索引和注意力权重(可选)
output_seq, attention_weight_seq = [], []
# 自回归生成目标句子(核心逻辑)
for _ in range(num_steps):
# 解码器前向传播:输入当前词(dec_X)和当前状态(dec_state),输出预测和新状态
# Y: 形状(1, 1, vocab_size) → 当前步对目标词汇表的概率分布
# dec_state: 更新后的解码器状态(用于下一步生成)
Y, dec_state = net.decoder(dec_X, dec_state)
dec_X = Y.argmax(dim=2) # 取vocab_size维度的最大值索引,形状(1, 1)
pred = dec_X.squeeze(dim=0).type(torch.int32).item() # 转换为单个token索引
# 保存注意力权重(如果需要可视化解码器关注的源句子位置)
if save_attention_weights:
attention_weight_seq.append(net.decoder.attention_weights)
# 如果预测到句尾标记<eos>,停止生成(句子结束)
if pred == tgt_vocab['<eos>']:
break
output_seq.append(pred)
# 将生成的token索引转换为目标语言句子(字符串)
return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq
BLEU的代码实现:
def bleu(pred_seq, label_seq, k):
"""计算BLEU"""
# 预处理:将生成序列和参考序列分割为词列表
pred_tokens = pred_seq.split(' ')
label_tokens = label_seq.split(' ')
len_pred = len(pred_tokens)
len_label = len(label_tokens)
# 长度惩罚(Length Penalty):惩罚过短的生成序列
# 公式:exp(min(0, 1 - 参考长度/生成长度))
score = math.exp(min(0, 1 - len_label / len_pred))
# 计算n-gram(1到k)的匹配度,累加到分数中
for n in range(1, k + 1): # 遍历1-gram, 2-gram, ..., k-gram
num_matches = 0 # 记录生成序列与参考序列匹配的n-gram数量
# 用字典统计参考序列中所有n-gram的出现次数(避免重复匹配)
label_subs = collections.defaultdict(int)
# 统计参考序列中所有n-gram的出现次数
# 遍历参考序列中所有可能的n-gram起始位置
for i in range(len_label - n + 1):
# 提取从i开始的n个词,组成n-gram字符串(如n=2时,"cat sits")
label_sub = ' '.join(label_tokens[i: i + n])
label_subs[label_sub] += 1 # 计数+1
# 统计生成序列中与参考序列匹配的n-gram数量
# 遍历生成序列中所有可能的n-gram起始位置
for i in range(len_pred - n + 1):
# 提取生成序列的n-gram
pred_sub = ' '.join(pred_tokens[i: i + n])
# 若该n-gram在参考序列中存在且未被匹配完,则计数+1,并减少参考中的可用次数
if label_subs[pred_sub] > 0:
num_matches += 1
label_subs[pred_sub] -= 1 # 避免重复匹配同一n-gram
# 计算当前n-gram的精确度,并加权到总分数中
# 生成序列中可能的n-gram总数(分母)
total_pred_subs = len_pred - n + 1 if len_pred >= n else 0
# 精确度 = 匹配的n-gram数量 / 生成序列的n-gram总数(避免除零)
precision = num_matches / total_pred_subs if total_pred_subs > 0 else 0
# 对精确度取权重:n越大,权重越小(0.5^n),但长短语匹配更有价值
score *= math.pow(precision, math.pow(0.5, n))
return score
翻译:
engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
translation, attention_weight_seq = predict_seq2seq(
net, eng, src_vocab, tgt_vocab, num_steps, device)
print(f'{eng} => {translation}, bleu {bleu(translation, fra, k=2):.3f}')
效果如下:
束搜索
贪心搜素
- 在seq2seq中我们使用了贪心搜索来预测序列
- 将当前时刻预测概率最大的词输出
- 但贪心很可能不是最优的
穷举搜索
- 原理:枚举所有可能的输出序列,计算每个序列的概率 P ( y 1 , y 2 , … , y T ∣ x ) P(y_1, y_2, \dots, y_T \mid x) P(y1,y2,…,yT∣x),选择概率最高的序列。
- 计算量:若输出字典大小为 n ,序列最大长度为 T ,则需考察的序列总数为 s u m t = 1 T n t ≈ n T sum_{t=1}^T n^t \approx n^T sumt=1Tnt≈nT(当 T 较大时)。
- 例:若 n=10000 , T=10 ,则序列总数约为 1000 0 10 = 1 0 40 10000^{10} = 10^{40} 1000010=1040,计算上完全不可行。
束搜索
-
保存最好的k个候选
-
在每个时刻,对每个选新加一项(n种可能),在kn个选项中选出最好的

-
时间复杂度 ( O(knT) )
- k = 5 , n = 10000 , T = 10 : k n T = 5 × 1 0 5 k = 5, n = 10000, T = 10 : knT = 5 \times 10^5 k=5,n=10000,T=10:knT=5×105
-
每个候选的最终分数是:
1 L α log p ( y 1 , … , y L ) = 1 L α ∑ t ′ = 1 L log p ( y t ′ ∣ y 1 , … , y t ′ − 1 ) \frac{1}{L^\alpha} \log p(y_1, \dots, y_L) = \frac{1}{L^\alpha} \sum_{t'=1}^L \log p(y_{t'} \mid y_1, \dots, y_{t'-1}) Lα1logp(y1,…,yL)=Lα1t′=1∑Llogp(yt′∣y1,…,yt′−1) -
通常 α = 0.75 \alpha = 0.75 α=0.75
-
束搜索在每次搜索时保存k个最好的候选
- k=1时是贪心搜索
注意力机制
-
心理学:
- 动物需要在复杂环境下有效关注值得注意的点
- 心理学框架:人类根据随意线索和不随意线索选择注意点
-
注意力机制
- 卷积、全连接、池化层都只考虑不随意线索
- 注意力机制则显示的考虑随意线索
- 随意线索被称之为查询(query)
- 每个输入是一个值(value)和不随意线索(key)的对
- key:“用于匹配的特征”—— 解决 “要不要关注”
- value:“用于提供信息的特征”—— 解决 “关注后取什么”
- 通过注意力池化层来有偏向性的选择选择某些输入
非参注意力池化层
- 给定数据 ( x i , y i x_i, y_i xi,yi), i = 1, … \dots …, n
- 平均池化是最简单的方案:
f ( x ) = 1 n ∑ i y i f(x) = \frac{1}{n} \sum_{i} y_i f(x)=n1i∑yi - 更好的方案是60年代提出的Nadaraya-Watson核回归:
f ( x ) = ∑ i = 1 n K ( x − x i ) ∑ j = 1 n K ( x − x j ) y i f(x) = \sum_{i=1}^{n} \frac{K(x - x_i)}{\sum_{j=1}^{n} K(x - x_j)} y_i f(x)=i=1∑n∑j=1nK(x−xj)K(x−xi)yi
使用高斯核: K(u) = 1 2 π exp ( − u 2 2 ) \frac{1}{\sqrt{2\pi}} \exp\left(-\frac{u^2}{2}\right) 2π1exp(−2u2)
那么
f ( x ) = ∑ i = 1 n exp ( − 1 2 ( x − x i ) 2 ) ∑ j = 1 n exp ( − 1 2 ( x − x j ) 2 ) y i = ∑ i = 1 n softmax ( − 1 2 ( x − x i ) 2 ) y i \begin{aligned} f(x) &= \sum_{i=1}^{n} \frac{\exp\left(-\frac{1}{2}(x - x_i)^2\right)}{\sum_{j=1}^{n} \exp\left(-\frac{1}{2}(x - x_j)^2\right)} y_i \\ &= \sum_{i=1}^{n} \text{softmax}\left(-\frac{1}{2}(x - x_i)^2\right) y_i \end{aligned} f(x)=i=1∑n∑j=1nexp(−21(x−xj)2)exp(−21(x−xi)2)yi=i=1∑nsoftmax(−21(x−xi)2)yi
注意力汇聚:Nadaraya-Watson 核回归 代码实现:
import torch
from torch import nn
from d2l import torch as d2l
n_train = 50
x_train, _ = torch.sort(torch.rand(n_train) * 5)
def f(x):
return 2 * torch.sin(x) + x**0.8
y_train = f(x_train) + torch.normal(0.0, 0.5, (n_train,))
x_test = torch.arange(0, 5, 0.1)
y_truth = f(x_test)
n_test = len(x_test)
def plot_kernel_reg(y_hat):
d2l.plot(x_test, [y_truth, y_hat], 'x', 'y', legend=['Truth', 'Pred'],
xlim=[0, 5], ylim=[-1, 5])
d2l.plt.plot(x_train, y_train, 'o', alpha=0.5);
# 生成 “平均池化” 的预测结果 y_hat
y_hat = torch.repeat_interleave(y_train.mean(), n_test)
plot_kernel_reg(y_hat)

非参数注意力汇聚:
# reshape((-1, n_train)):重塑为形状为(n_test, n_train)的矩阵(50×50)
# 矩阵中每行对应一个测试样本,每行的n_train个元素都是该测试样本的值(如第1行全为0.0,第2行全为0.1)
X_repeat = x_test.repeat_interleave(n_train).reshape((-1, n_train))
attention_weights = nn.functional.softmax(-(X_repeat - x_train)** 2 / 2, dim=1)
y_hat = torch.matmul(attention_weights, y_train)
plot_kernel_reg(y_hat)
d2l.show_heatmaps(attention_weights.unsqueeze(0).unsqueeze(0),
xlabel='Sorted training inputs',
ylabel='Sorted testing inputs')

带参数注意力汇聚 :
这里先引入批量矩阵乘法:假定两个张量的形状分别是 ( n , a , b ) (n,a,b) (n,a,b)和 ( n , b , c ) (n,b,c) (n,b,c),
它们的批量矩阵乘法输出的形状为 ( n , a , c ) (n,a,c) (n,a,c)
X = torch.ones((2, 1, 4))
Y = torch.ones((2, 4, 6))
torch.bmm(X, Y).shape
# torch.Size([2, 1, 6])
# 使用小批量矩阵乘法来计算小批量数据中的加权平均值
weights_reshaped = weights.unsqueeze(1) # 形状:(2, 1, 10)
values_reshaped = values.unsqueeze(-1) # 形状:(2, 10, 1)
torch.bmm(weights.unsqueeze(1), values.unsqueeze(-1))
# tensor([[[ 4.5000]],
# [[14.5000]]])
接下来是带参数注意力汇聚实现:
class NWKernelRegression(nn.Module):
def __init__(self, **kwargs):
super().__init__(**kwargs)
# 通过学习一个w来控制曲线平滑一点还是不平滑一点
self.w = nn.Parameter(torch.rand((1,), requires_grad=True))
def forward(self, queries, keys, values):
queries = queries.repeat_interleave(keys.shape[1]).reshape((-1, keys.shape[1]))
self.attention_weights = nn.functional.softmax(
-((queries - keys) * self.w)**2 / 2, dim=1)
return torch.bmm(self.attention_weights.unsqueeze(1),
values.unsqueeze(-1)).reshape(-1)
X_tile = x_train.repeat((n_train, 1))
Y_tile = y_train.repeat((n_train, 1))
# keys: 从X_tile中筛选非对角线元素,重塑为 (n_train, n_train-1)
# values: 从Y_tile中筛选非对角线元素,重塑为 (n_train, n_train-1
# 筛掉对角线的核心目的是防止模型 “作弊”,强制它学习 “不同样本之间的真实关联”,而不是简单记住 “自己等于自己”,从而避免过拟合
keys = X_tile[(1 - torch.eye(n_train)).type(torch.bool)].reshape((n_train, -1))
values = Y_tile[(1 - torch.eye(n_train)).type(torch.bool)].reshape((n_train, -1))
net = NWKernelRegression()
loss = nn.MSELoss(reduction='none')
trainer = torch.optim.SGD(net.parameters(), lr=0.5)
animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[1, 5])
for epoch in range(5):
trainer.zero_grad()
l = loss(net(x_train, keys, values), y_train)
l.sum().backward()
trainer.step()
print(f'epoch {epoch + 1}, loss {float(l.sum()):.6f}')
animator.add(epoch + 1, float(l.sum()))
keys = x_train.repeat((n_test, 1))
values = y_train.repeat((n_test, 1))
y_hat = net(x_test, keys, values).unsqueeze(1).detach()
plot_kernel_reg(y_hat)
d2l.show_heatmaps(net.attention_weights.unsqueeze(0).unsqueeze(0),
xlabel='Sorted training inputs',
ylabel='Sorted testing inputs')

注意力分数
怎样把 key 和 value 扩展到高维度:
-
假设query q ∈ R q \mathbf{q} \in \mathbb{R}^q q∈Rq, m m m 对 key-value ( k 1 , v 1 ) , … , (\mathbf{k}_1, \mathbf{v}_1), \dots, (k1,v1),…,,这里 k i ∈ R k , v i ∈ R v \mathbf{k}_i \in \mathbb{R}^k, \mathbf{v}_i \in \mathbb{R}^v ki∈Rk,vi∈Rv
-
注意力池化层:
f ( q , ( k 1 , v 1 ) , … , ( k m , v m ) ) = ∑ i = 1 m α ( q , k i ) v i ∈ R v f(\mathbf{q}, (\mathbf{k}_1, \mathbf{v}_1), \dots, (\mathbf{k}_m, \mathbf{v}_m)) = \sum_{i=1}^m \alpha(\mathbf{q}, \mathbf{k}_i)\mathbf{v}_i \in \mathbb{R}^v f(q,(k1,v1),…,(km,vm))=i=1∑mα(q,ki)vi∈Rv
α ( q , k i ) = softmax ( a ( q , k i ) ) = exp ( a ( q , k i ) ) ∑ j = 1 m exp ( a ( q , k j ) ) ∈ R \alpha(\mathbf{q}, \mathbf{k}_i) = \text{softmax}(a(\mathbf{q}, \mathbf{k}_i)) = \frac{\exp(a(\mathbf{q}, \mathbf{k}_i))}{\sum_{j=1}^m \exp(a(\mathbf{q}, \mathbf{k}_j))} \in \mathbb{R} α(q,ki)=softmax(a(q,ki))=∑j=1mexp(a(q,kj))exp(a(q,ki))∈R
Additive Attention:
- 可学参数: W k ∈ R h × k , W q ∈ R h × q , v ∈ R h \mathbf{W}_k \in \mathbb{R}^{h \times k}, \mathbf{W}_q \in \mathbb{R}^{h \times q}, \mathbf{v} \in \mathbb{R}^h Wk∈Rh×k,Wq∈Rh×q,v∈Rh
a ( k , q ) = v T tanh ( W k k + W q q ) a(\mathbf{k}, \mathbf{q}) = \mathbf{v}^T \tanh(\mathbf{W}_k \mathbf{k} + \mathbf{W}_q \mathbf{q}) a(k,q)=vTtanh(Wkk+Wqq)
Scaled Dot-Product Attention:
-
如果query和key都是同样的长度 q , k i ∈ R d \mathbf{q}, \mathbf{k}_i \in \mathbb{R}^d q,ki∈Rd,那么可以
a ( q , k i ) = ⟨ q , k i ⟩ / d a(\mathbf{q}, \mathbf{k}_i) = \langle \mathbf{q}, \mathbf{k}_i \rangle / \sqrt{d} a(q,ki)=⟨q,ki⟩/d -
向量化版本
- Q ∈ R n × d \mathbf{Q} \in \mathbb{R}^{n \times d} Q∈Rn×d, K ∈ R m × d , V ∈ R m × v \mathbf{K} \in \mathbb{R}^{m \times d}, \mathbf{V} \in \mathbb{R}^{m \times v} K∈Rm×d,V∈Rm×v
- 注意力分数: a ( Q , K ) = Q K T / d ∈ R n × m a(\mathbf{Q}, \mathbf{K}) = \mathbf{Q}\mathbf{K}^T / \sqrt{d} \in \mathbb{R}^{n \times m} a(Q,K)=QKT/d∈Rn×m
- 注意力池化: f = softmax ( a ( Q , K ) ) V ∈ R n × v f = \text{softmax}\left(a(\mathbf{Q}, \mathbf{K})\right) \mathbf{V} \in \mathbb{R}^{n \times v} f=softmax(a(Q,K))V∈Rn×v
注意力打分函数代码实现:
import math
import torch
from torch import nn
from d2l import torch as d2l
def masked_softmax(X, valid_lens):
"""通过在最后一个轴上掩蔽元素来执行softmax操作"""
if valid_lens is None:
return nn.functional.softmax(X, dim=-1)
else:
shape = X.shape
if valid_lens.dim() == 1:
# 若valid_lens是1维张量(如(batch_size,)),则按X的第1维长度重复
# 例如:X形状为(batch_size, seq_len, dim),则重复seq_len次,使valid_lens长度为batch_size*seq_len
valid_lens = torch.repeat_interleave(valid_lens, shape[1])
else:
# 若valid_lens是多维张量,展平为1维(总长度需与X中"样本数"匹配)
valid_lens = valid_lens.reshape(-1)
X = d2l.sequence_mask(
X.reshape(-1, shape[-1]), # 展平为二维:(N, shape[-1]),N=批量大小*其他维度乘积
valid_lens, # 1维有效长度列表,每个元素对应一个样本
value=-1e6 # 掩蔽值设为-1e6(适合softmax场景)
)
return nn.functional.softmax(X.reshape(shape), dim=-1)
class AdditiveAttention(nn.Module):
"""加性注意力"""
def __init__(self, key_size, query_size, num_hiddens, dropout, **kwargs):
super(AdditiveAttention, self).__init__(**kwargs)
self.W_k = nn.Linear(key_size, num_hiddens, bias=False)
self.W_q = nn.Linear(query_size, num_hiddens, bias=False)
self.w_v = nn.Linear(num_hiddens, 1, bias=False)
self.dropout = nn.Dropout(dropout)
def forward(self, queries, keys, values, valid_lens):
queries, keys = self.W_q(queries), self.W_k(keys)
features = queries.unsqueeze(2) + keys.unsqueeze(1)
features = torch.tanh(features)
scores = self.w_v(features).squeeze(-1)
self.attention_weights = masked_softmax(scores, valid_lens)
return torch.bmm(self.dropout(self.attention_weights), values)
class DotProductAttention(nn.Module):
"""缩放点积注意力"""
def __init__(self, dropout, **kwargs):
super(DotProductAttention, self).__init__(**kwargs)
self.dropout = nn.Dropout(dropout)
def forward(self, queries, keys, values, valid_lens=None):
d = queries.shape[-1]
scores = torch.bmm(queries, keys.transpose(1,2)) / math.sqrt(d)
self.attention_weights = masked_softmax(scores, valid_lens)
return torch.bmm(self.dropout(self.attention_weights), values)
使用注意力机制的seq2seq
动机:
- 机器翻译中,每个生成的词可能相关于源句子中不同的词
- seq2seq模型中不能对此直接建模
加入注意力 - 编码器对每次词的输出作为 key 和 value(他们是相同的)
- 解码器 RNN 对上一个词的输出是 query
- 注意力的输出和下一个词的词嵌入合并进入解码器 RNN
代码实现(Bahdanau 注意力):
import torch
from torch import nn
from d2l import torch as d2l
class AttentionDecoder(d2l.Decoder):
"""带有注意力机制解码器的基本接口"""
def __init__(self, **kwargs):
super(AttentionDecoder, self).__init__(**kwargs)
@property
def attention_weights(self):
raise NotImplementedError
class Seq2SeqAttentionDecoder(AttentionDecoder):
# 初始化带注意力机制的Seq2Seq解码器
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
dropout=0, **kwargs):
super(Seq2SeqAttentionDecoder, self).__init__(** kwargs)
self.attention = d2l.AdditiveAttention( # 加性注意力机制
num_hiddens, num_hiddens, num_hiddens, dropout)
self.embedding = nn.Embedding(vocab_size, embed_size) # 词嵌入层
self.rnn = nn.GRU(
embed_size + num_hiddens, num_hiddens, num_layers,
dropout=dropout)
self.dense = nn.Linear(num_hiddens, vocab_size)
# 初始化解码器状态(使用编码器输出)
def init_state(self, enc_outputs, enc_valid_lens, *args):
outputs, hidden_state = enc_outputs # 编码器输出和隐藏状态
# 调整维度并返回状态(编码器输出、隐藏状态、有效长度)
return (outputs.permute(1, 0, 2), hidden_state, enc_valid_lens)
# 前向传播:输入X和状态,输出预测和新状态
def forward(self, X, state):
enc_outputs, hidden_state, enc_valid_lens = state # 解码器初始状态
X = self.embedding(X).permute(1, 0, 2) # 词嵌入并调整维度(时间步, 批量, 嵌入维度)
outputs, self._attention_weights = [], [] # 存储输出和注意力权重
# 逐时间步处理输入
for x in X:
# 用GRU最后一层的隐藏状态作为查询,新增的维度(第 1 维)代表 “查询的数量”
query = torch.unsqueeze(hidden_state[-1], dim=1)
# 调用加性注意力:query(解码器隐藏态)与enc_outputs(编码器输出,作为key和value)计算关联
context = self.attention(
query, enc_outputs, enc_outputs, enc_valid_lens)# context形状:(batch_size, 1, num_hiddens)(每个查询对应的注意力加权结果)
x = torch.cat((context, torch.unsqueeze(x, dim=1)), dim=-1)
# (GRU要求输入形状为(seq_len, batch_size, input_size),这里seq_len=1)
out, hidden_state = self.rnn(x.permute(1, 0, 2), hidden_state)
outputs.append(out) # 收集输出
self._attention_weights.append(self.attention.attention_weights) # 收集注意力权重
# 拼接所有时间步输出,通过全连接层预测词汇
outputs = self.dense(torch.cat(outputs, dim=0))
# 调整输出维度并返回(批量, 时间步, 词汇表大小),以及新状态
return outputs.permute(1, 0, 2), [enc_outputs, hidden_state, enc_valid_lens]
# 提供注意力权重的访问接口(实现基类要求)
@property
def attention_weights(self):
return self._attention_weights
训练:
embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1
batch_size, num_steps = 64, 10
lr, num_epochs, device = 0.005, 250, d2l.try_gpu()
train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)
encoder = d2l.Seq2SeqEncoder(
len(src_vocab), embed_size, num_hiddens, num_layers, dropout)
decoder = Seq2SeqAttentionDecoder(
len(tgt_vocab), embed_size, num_hiddens, num_layers, dropout)
net = d2l.EncoderDecoder(encoder, decoder)
d2l.train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
translation, dec_attention_weight_seq = d2l.predict_seq2seq(
net, eng, src_vocab, tgt_vocab, num_steps, device, True)
print(f'{eng} => {translation}, ',
f'bleu {d2l.bleu(translation, fra, k=2):.3f}')
自注意力
给定序列 x 1 , … , x n , ∀ x i ∈ R d 自注意力池化层将 x i 当做key,value,query来对序列抽取特征得到 y 1 , … , y n , 这里 y i = f ( x i , ( x 1 , x 1 ) , … , ( x n , x n ) ) ∈ R d \begin{aligned} &\text{给定序列 } \mathbf{x}_1, \dots, \mathbf{x}_n, \forall \mathbf{x}_i \in \mathbb{R}^d \\ &\text{自注意力池化层将 } \mathbf{x}_i \text{ 当做key,value,query来对序列抽取特征得到 } \mathbf{y}_1, \dots, \mathbf{y}_n,\text{ 这里} \\ &\mathbf{y}_i = f(\mathbf{x}_i, (\mathbf{x}_1, \mathbf{x}_1), \dots, (\mathbf{x}_n, \mathbf{x}_n)) \in \mathbb{R}^d \end{aligned} 给定序列 x1,…,xn,∀xi∈Rd自注意力池化层将 xi 当做key,value,query来对序列抽取特征得到 y1,…,yn, 这里yi=f(xi,(x1,x1),…,(xn,xn))∈Rd
| CNN | RNN | 自注意力 | |
|---|---|---|---|
| 计算复杂度 | O ( k n d 2 ) O(knd^2) O(knd2) | O ( n d 2 ) O(nd^2) O(nd2) | O ( n 2 d ) O(n^2d) O(n2d) |
| 并行度 | O ( n ) O(n) O(n) | O ( 1 ) O(1) O(1) | O ( n ) O(n) O(n) |
| 最长路径 | O ( n / k ) O(n/k) O(n/k) | O ( n ) O(n) O(n) | O ( 1 ) O(1) O(1) |
位置编码
- 跟CNN/RNN不同,自注意力并没有记录位置信息
- 位置编码将位置信息注入到输入里
- 假设长度为 n n n 的序列是 X ∈ R n × d \mathbf{X} \in \mathbb{R}^{n \times d} X∈Rn×d,那么使用位置编码矩阵 P ∈ R n × d \mathbf{P} \in \mathbb{R}^{n \times d} P∈Rn×d 来输出 X + P \mathbf{X} + \mathbf{P} X+P 作为自编码输入
- P \mathbf{P} P 的元素如下计算:
p i , 2 j = sin ( i 1000 0 2 j / d ) , p i , 2 j + 1 = cos ( i 1000 0 2 j / d ) p_{i,2j} = \sin\left( \frac{i}{10000^{2j/d}} \right), \quad p_{i,2j+1} = \cos\left( \frac{i}{10000^{2j/d}} \right) pi,2j=sin(100002j/di),pi,2j+1=cos(100002j/di)
相对位置信息
- 位置于 i + δ i+\delta i+δ 处的位置编码可以通过线性投影位置 i 处的位置编码来表示
- 记 ω j = 1 / 1000 0 2 j / d \omega_j = 1/10000^{2j/d} ωj=1/100002j/d,那么
[ cos ( δ ω j ) sin ( δ ω j ) − sin ( δ ω j ) cos ( δ ω j ) ] [ p i , 2 j p i , 2 j + 1 ] [ p i + δ , 2 j p i + δ , 2 j + 1 ] \begin{bmatrix} \cos(\delta \omega_j) & \sin(\delta \omega_j) \\ -\sin(\delta \omega_j) & \cos(\delta \omega_j) \end{bmatrix} \begin{bmatrix} p_{i,2j} \\ p_{i,2j+1} \end{bmatrix} \begin{bmatrix} p_{i+\delta,2j} \\ p_{i+\delta,2j+1} \end{bmatrix} [cos(δωj)−sin(δωj)sin(δωj)cos(δωj)][pi,2jpi,2j+1][pi+δ,2jpi+δ,2j+1]
投影矩阵跟 i 无关
代码实现:
import math
import torch
from torch import nn
from d2l import torch as d2l
# 自注意力
num_hiddens, num_heads = 100, 5
attention = d2l.MultiHeadAttention(num_hiddens, num_hiddens, num_hiddens,
num_hiddens, num_heads, 0.5)
attention.eval()
batch_size, num_queries, valid_lens = 2, 4, torch.tensor([3, 2])
X = torch.ones((batch_size, num_queries, num_hiddens))
attention(X, X, X, valid_lens).shape
class PositionalEncoding(nn.Module):
"""位置编码"""
def __init__(self, num_hiddens, dropout, max_len=1000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(dropout)
self.P = torch.zeros((1, max_len, num_hiddens))
X = torch.arange(max_len, dtype=torch.float32).reshape(
-1, 1) / torch.pow(10000, torch.arange(
0, num_hiddens, 2, dtype=torch.float32) / num_hiddens)
self.P[:, :, 0::2] = torch.sin(X)
self.P[:, :, 1::2] = torch.cos(X)
def forward(self, X):
X = X + self.P[:, :X.shape[1], :].to(X.device)
return self.dropout(X)
Transformer
Transformer架构
基于编码器-解码器架构来处理序列对
跟使用注意力的seq2seq不同,Transformer是纯基于注意力
多头注意力
- 对同一key,value,query,希望抽取不同的信息
- 例如短距离关系和长距离关系
- 多头注意力使用h个独立的注意力池化
- 合并各个头(head)输出得到最终输出

- 合并各个头(head)输出得到最终输出
多头注意力
- query q ∈ R d q \mathbf{q} \in \mathbb{R}^{d_q} q∈Rdq,key k ∈ R d k \mathbf{k} \in \mathbb{R}^{d_k} k∈Rdk,value v ∈ R d v \mathbf{v} \in \mathbb{R}^{d_v} v∈Rdv
- 头 i i i 的可学习参数: W i ( q ) ∈ R p q × d q \mathbf{W}_i^{(q)} \in \mathbb{R}^{p_q \times d_q} Wi(q)∈Rpq×dq、 W i ( k ) ∈ R p k × d k \mathbf{W}_i^{(k)} \in \mathbb{R}^{p_k \times d_k} Wi(k)∈Rpk×dk、 W i ( v ) ∈ R p v × d v \mathbf{W}_i^{(v)} \in \mathbb{R}^{p_v \times d_v} Wi(v)∈Rpv×dv
- 头 i i i 的输出: h i = f ( W i ( q ) q , W i ( k ) k , W i ( v ) v ) ∈ R p v \mathbf{h}_i = f(\mathbf{W}_i^{(q)}\mathbf{q}, \mathbf{W}_i^{(k)}\mathbf{k}, \mathbf{W}_i^{(v)}\mathbf{v}) \in \mathbb{R}^{p_v} hi=f(Wi(q)q,Wi(k)k,Wi(v)v)∈Rpv
- 输出的可学习参数: W o ∈ R p o × h p v \mathbf{W}_o \in \mathbb{R}^{p_o \times h p_v} Wo∈Rpo×hpv
- 多头注意力的输出: h = W o [ h 1 h 2 ⋮ h h ] ∈ R p o \mathbf{h} = \mathbf{W}_o \begin{bmatrix} \mathbf{h}_1 \\ \mathbf{h}_2 \\ \vdots \\ \mathbf{h}_h \end{bmatrix} \in \mathbb{R}^{p_o} h=Wo h1h2⋮hh ∈Rpo
有掩码的多头注意力
- 解码器对序列中一个元素输出时,不应该考虑该元素之后的元素
- 可以通过掩码来实现
- 也就是计算 x i x_i xi 输出时,假装当前序列长度为 i
基于位置的前馈网络
- 将输入形状由(b,n,d)变换成(bn,d)
- 作用两个全连接层
- 输出形状由(bn,d)变化回(b,n,d)
- 等价于两层核窗口为1的一维卷积层
层归一化
-
批量归一化对每个特征/通道里元素进行归一化
- 不适合序列长度会变的NLP应用层归一化
-
对每个样本里的元素进行归一化

-
编码器中的输出 y 1 , … , y n \mathbf{y}_1, \dots, \mathbf{y}_n y1,…,yn
-
将其作为解码中第 ( i ) 个Transformer块中多头注意力的key和value
- 它的query来自目标序列
-
意味着编码器和解码器中块的输出维度是一样的
预测
- 预测第t+1个输出时
- 解码器中输入前t个预测值
- 在自注意力中,前t个预测值作为key和value,第t个预测值还作为query
代码实现:
多头注意力:
import math
import torch
from torch import nn
from d2l import torch as d2l
class MultiHeadAttention(nn.Module):
"""多头注意力机制:将注意力拆分为多个头并行计算,捕捉不同子空间的特征关联"""
def __init__(self, key_size, query_size, value_size, num_hiddens, num_heads, dropout, bias=False, **kwargs):
super(MultiHeadAttention, self).__init__(**kwargs)
self.num_heads = num_heads # 注意力头的数量
self.attention = d2l.DotProductAttention(dropout) # 单头点积注意力模块
# 线性投影层:将query/key/value映射到指定隐藏维度
self.W_q = nn.Linear(query_size, num_hiddens, bias=bias)
self.W_k = nn.Linear(key_size, num_hiddens, bias=bias)
self.W_v = nn.Linear(value_size, num_hiddens, bias=bias)
# 输出投影层:合并多头结果后再次线性变换
self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias)
def forward(self, queries, keys, values, valid_lens):
# 1. 线性投影 + 维度变换(适配多头并行计算)
queries = transpose_qkv(self.W_q(queries), self.num_heads)
keys = transpose_qkv(self.W_k(keys), self.num_heads)
values = transpose_qkv(self.W_v(values), self.num_heads)
# 2. 处理有效长度:为每个头重复有效长度(每个头独立进行掩码)
if valid_lens is not None:
valid_lens = torch.repeat_interleave(
valid_lens, repeats=self.num_heads, dim=0)
# 3. 多头注意力核心计算(多个头并行执行注意力过程)
output = self.attention(queries, keys, values, valid_lens)
# 4. 还原维度 + 输出投影(合并多头结果并做最终线性变换)
output_concat = transpose_output(output, self.num_heads)
return self.W_o(output_concat)
# 辅助函数:调整Q/K/V的维度以支持多头并行计算
def transpose_qkv(X, num_heads):
"""将输入形状从 (batch_size, seq_len, num_hiddens) 转换为
(batch_size * num_heads, seq_len, num_hiddens // num_heads),
让多个注意力头能并行计算"""
X = X.reshape(X.shape[0], X.shape[1], num_heads, -1) # 拆分隐藏层为“头数 × 单头隐藏维度”
X = X.permute(0, 2, 1, 3) # 调整维度顺序为 (batch, heads, seq, hidden_per_head)
return X.reshape(-1, X.shape[2], X.shape[3]) # 合并batch和heads维度,得到并行计算的形状,实现用一次矩阵运算完成所有头的注意力计算
# 辅助函数:逆转 transpose_qkv 的维度变换,合并多头结果
def transpose_output(X, num_heads):
"""将输入形状从 (batch_size * num_heads, seq_len, num_hiddens // num_heads)
转换回 (batch_size, seq_len, num_hiddens),合并多个头的结果"""
X = X.reshape(-1, num_heads, X.shape[1], X.shape[2]) # 拆分batch和heads维度
X = X.permute(0, 2, 1, 3) # 调整维度顺序为 (batch, seq, heads, hidden_per_head)
return X.reshape(X.shape[0], X.shape[1], -1) # 合并heads和单头隐藏维度,还原总隐藏层维度
Transformer代码实现:
import math
import pandas as pd
import torch
from torch import nn
from d2l import torch as d2l
class PositionWiseFFN(nn.Module):
"""基于位置的前馈网络"""
def __init__(self, ffn_num_input, ffn_num_hiddens, ffn_num_outputs,
**kwargs):
super(PositionWiseFFN, self).__init__(**kwargs)
self.dense1 = nn.Linear(ffn_num_input, ffn_num_hiddens)
self.relu = nn.ReLU()
self.dense2 = nn.Linear(ffn_num_hiddens, ffn_num_outputs)
def forward(self, X):
return self.dense2(self.relu(self.dense1(X)))
class AddNorm(nn.Module):
"""残差连接后进行层规范化"""
def __init__(self, normalized_shape, dropout, **kwargs):
super(AddNorm, self).__init__(**kwargs)
self.dropout = nn.Dropout(dropout)
self.ln = nn.LayerNorm(normalized_shape)
def forward(self, X, Y):
return self.ln(self.dropout(Y) + X)
class EncoderBlock(nn.Module):
"""Transformer编码器块"""
def __init__(self, key_size, query_size, value_size, num_hiddens,
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
dropout, use_bias=False, **kwargs):
super(EncoderBlock, self).__init__(**kwargs)
self.attention = d2l.MultiHeadAttention(
key_size, query_size, value_size, num_hiddens, num_heads, dropout,
use_bias)
self.addnorm1 = AddNorm(norm_shape, dropout)
self.ffn = PositionWiseFFN(
ffn_num_input, ffn_num_hiddens, num_hiddens)
self.addnorm2 = AddNorm(norm_shape, dropout)
def forward(self, X, valid_lens):
Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))
return self.addnorm2(Y, self.ffn(Y))
class TransformerEncoder(d2l.Encoder):
"""Transformer编码器"""
def __init__(self, vocab_size, key_size, query_size, value_size,
num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
num_heads, num_layers, dropout, use_bias=False, **kwargs):
super(TransformerEncoder, self).__init__(** kwargs)
# 核心参数与组件初始化
self.num_hiddens = num_hiddens # 隐藏层维度(如512,贯穿整个编码器的核心维度)
self.embedding = nn.Embedding(vocab_size, num_hiddens) # 词嵌入层
self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout) # 位置编码层
self.blks = nn.Sequential() # 堆叠多个编码器块
# 循环添加num_layers个编码器块(EncoderBlock)
for i in range(num_layers):
self.blks.add_module("block"+str(i),
EncoderBlock(
key_size=key_size, # 键(key)的特征维度
query_size=query_size, # 查询(query)的特征维度
value_size=value_size, # 值(value)的特征维度
num_hiddens=num_hiddens, # 隐藏层维度(与词嵌入维度一致)
norm_shape=norm_shape, # 层归一化的目标维度
ffn_num_input=ffn_num_input, # 前馈网络的输入维度
ffn_num_hiddens=ffn_num_hiddens, # 前馈网络的隐藏维度
num_heads=num_heads, # 多头注意力的头数
dropout=dropout, # dropout概率
use_bias=use_bias # 线性层是否使用偏置
)
)
def forward(self, X, valid_lens, *args):
X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
self.attention_weights = [None] * len(self.blks)
for i, blk in enumerate(self.blks):
X = blk(X, valid_lens) # 每个块接收上一层的输出,输出更精炼的特征
# 记录当前层的注意力权重(用于后续可视化或分析)
self.attention_weights[i] = blk.attention.attention.attention_weights
return X # 返回最终的编码特征(形状:(batch_size, seq_len, num_hiddens))
class DecoderBlock(nn.Module):
"""解码器中第i个块"""
def __init__(self, key_size, query_size, value_size, num_hiddens,
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
dropout, i, **kwargs):
super(DecoderBlock, self).__init__(** kwargs)
self.i = i # 当前解码器块的索引(用于定位历史状态)
# 第一个多头注意力:解码器自注意力(带掩码,防止关注未来位置)
self.attention1 = d2l.MultiHeadAttention(
key_size, query_size, value_size, num_hiddens, num_heads, dropout)
self.addnorm1 = AddNorm(norm_shape, dropout) # 残差连接+层归一化
# 第二个多头注意力:编码器-解码器注意力(关联源序列和目标序列)
self.attention2 = d2l.MultiHeadAttention(
key_size, query_size, value_size, num_hiddens, num_heads, dropout)
self.addnorm2 = AddNorm(norm_shape, dropout) # 残差连接+层归一化
self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens, num_hiddens)
self.addnorm3 = AddNorm(norm_shape, dropout) # 残差连接+层归一化
def forward(self, X, state):
# 从state中提取编码器输出、编码器有效长度、历史状态
enc_outputs, enc_valid_lens = state[0], state[1]
# state[2] 存储每个解码器块的历史状态(用于自回归生成)
if state[2][self.i] is None:
key_values = X # 初始状态:key/value为当前输入X
else:
# 非初始状态:拼接历史状态和当前输入(保留之前生成的词)
key_values = torch.cat((state[2][self.i], X), axis=1)
# 更新当前块的历史状态(用于下一次生成)
state[2][self.i] = key_values
# 训练时:生成解码器的有效长度(用于掩码未来位置)
if self.training:
batch_size, num_steps, _ = X.shape
# dec_valid_lens形状:(batch_size, num_steps),每行是[1,2,...,num_steps]
# 作用:确保每个位置只能关注自身及之前的位置(屏蔽未来)
dec_valid_lens = torch.arange(1, num_steps + 1, device=X.device).repeat(batch_size, 1)
else:
dec_valid_lens = None # 推理时通过其他方式控制掩码
# 解码器自注意力(带掩码)
X2 = self.attention1(X, key_values, key_values, dec_valid_lens)
Y = self.addnorm1(X, X2) # 残差连接+层归一化
# 编码器-解码器注意力(关联源序列)
Y2 = self.attention2(Y, enc_outputs, enc_outputs, enc_valid_lens)
Z = self.addnorm2(Y, Y2) # 残差连接+层归一化
# 步骤3:前馈网络+最终残差连接
return self.addnorm3(Z, self.ffn(Z)), state
class TransformerDecoder(d2l.AttentionDecoder):
"""Transformer解码器:基于编码器输出和已生成序列,生成目标序列"""
def __init__(self, vocab_size, key_size, query_size, value_size,
num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
num_heads, num_layers, dropout, **kwargs):
# 调用父类构造函数
super(TransformerDecoder, self).__init__(** kwargs)
self.num_hiddens = num_hiddens
self.num_layers = num_layers
self.embedding = nn.Embedding(vocab_size, num_hiddens)
self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout)
self.blks = nn.Sequential()
for i in range(num_layers):
# 每个解码器块包含:带掩码的自注意力、编码器-解码器交叉注意力、前馈网络
self.blks.add_module("block"+str(i),
DecoderBlock(
key_size=key_size, # 注意力中key的维度
query_size=query_size, # 注意力中query的维度
value_size=value_size, # 注意力中value的维度
num_hiddens=num_hiddens, # 隐藏层维度
norm_shape=norm_shape, # 层归一化的目标维度
ffn_num_input=ffn_num_input, # 前馈网络的输入维度
ffn_num_hiddens=ffn_num_hiddens, # 前馈网络的隐藏维度
num_heads=num_heads, # 多头注意力的头数
dropout=dropout, # dropout概率
i=i # 当前块的索引(用于定位历史状态)
)
)
self.dense = nn.Linear(num_hiddens, vocab_size)
def init_state(self, enc_outputs, enc_valid_lens, *args):
"""初始化解码器状态(供生成过程使用)"""
return [enc_outputs, enc_valid_lens, [None] * self.num_layers]
def forward(self, X, state):
"""前向传播:处理目标序列,生成预测结果"""
# 词嵌入:(batch_size, seq_len) → (batch_size, seq_len, num_hiddens)
# 缩放:使词嵌入方差与位置编码匹配;位置编码:注入位置信息
X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
# _attention_weights[0]:解码器自注意力权重;[1]:编码器-解码器交叉注意力权重
self._attention_weights = [[None] * len(self.blks) for _ in range(2)]
for i, blk in enumerate(self.blks):
X, state = blk(X, state)
self._attention_weights[0][i] = blk.attention1.attention.attention_weights
self._attention_weights[1][i] = blk.attention2.attention.attention_weights
return self.dense(X), state
@property
def attention_weights(self):
"""属性方法:获取记录的注意力权重(供外部分析/可视化使用)"""
return self._attention_weights
训练:
num_hiddens, num_layers, dropout, batch_size, num_steps = 32, 2, 0.1, 64, 10
lr, num_epochs, device = 0.005, 200, d2l.try_gpu()
ffn_num_input, ffn_num_hiddens, num_heads = 32, 64, 4
key_size, query_size, value_size = 32, 32, 32
norm_shape = [32]
train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)
encoder = TransformerEncoder(
len(src_vocab), key_size, query_size, value_size, num_hiddens,
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
num_layers, dropout)
decoder = TransformerDecoder(
len(tgt_vocab), key_size, query_size, value_size, num_hiddens,
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
num_layers, dropout)
net = d2l.EncoderDecoder(encoder, decoder)
train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)

预测:
engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
translation, dec_attention_weight_seq = d2l.predict_seq2seq(
net, eng, src_vocab, tgt_vocab, num_steps, device, True)
print(f'{eng} => {translation}, ',
f'bleu {d2l.bleu(translation, fra, k=2):.3f}')

自然语言处理
NLP里的迁移学习
- 使用预训练好的模型来抽取词、句子的特征
- 例如 word2vec 或语言模型
- 不更新预训练好的模型
- 需要构建新的网络来抓取新任务需要的信息
- Word2vec忽略了时序信息,语言模型只看了一个方向
BERT
BERT的动机
- 基于微调的NLP模型
- 预训练的模型抽取了足够多的信息
- 新的任务只需要增加一个简单的输出层
对输入的修改
- 每个样本是一个句子对
- 加入额外的片段嵌入
- 位置编码可学习
预训练任务1:带掩码的语言模型
- Transfomer的编码器是双向,标准语言模型要求单向
- 带掩码的语言模型每次随机(15%概率)将一些词元换成
<mask> - 因为微调任务中不出现
<mask>- 80%概率下,将选中的词元变成
<mask> - 10%概率下换成一个随机词元
- 10%概率下保持原有的词元
- 80%概率下,将选中的词元变成
预训练任务2:下一句子预测
- 预测一个句子对中两个句子是不是相邻
- 训练样本中:
- 50%概率选择相邻句子对:
<cls>this movie is great<sep>i like it<sep> - 50%概率选择随机句子对:
<cls>this movie is great<sep>hello world<sep>
- 50%概率选择相邻句子对:
- 将
<cls>对应的输出放到一个全连接层来预测
代码:
import torch
from torch import nn
from d2l import torch as d2l
#@save
def get_tokens_and_segments(tokens_a, tokens_b=None):
"""获取输入序列的词元及其片段索引"""
tokens = ['<cls>'] + tokens_a + ['<sep>']
# 0和1分别标记片段A和B
segments = [0] * (len(tokens_a) + 2)
if tokens_b is not None:
tokens += tokens_b + ['<sep>']
segments += [1] * (len(tokens_b) + 1)
return tokens, segments
#@save
class BERTEncoder(nn.Module):
"""BERT编码器:将文本词元转换为包含语义、位置和片段信息的上下文向量"""
def __init__(self, vocab_size, num_hiddens, norm_shape, ffn_num_input,
ffn_num_hiddens, num_heads, num_layers, dropout,
max_len=1000, key_size=768, query_size=768, value_size=768,
**kwargs):
super(BERTEncoder, self).__init__(** kwargs)
# 1. 词嵌入层:将词元ID映射为向量,捕捉基础语义
self.token_embedding = nn.Embedding(vocab_size, num_hiddens)
# 2. 片段嵌入层:区分两个句子(0表示句子A,1表示句子B)
# 仅需2个嵌入向量(因为最多处理两个句子)
self.segment_embedding = nn.Embedding(2, num_hiddens)
# 3. 堆叠多个Transformer编码器块:实现深层上下文编码
self.blks = nn.Sequential()
for i in range(num_layers):
self.blks.add_module(f"{i}", d2l.EncoderBlock(
key_size, query_size, value_size, num_hiddens, norm_shape,
ffn_num_input, ffn_num_hiddens, num_heads, dropout, True))
# 4. 可学习的位置嵌入:注入词元在序列中的位置信息
# 形状:(1, max_len, num_hiddens),max_len为最大序列长度
# 用nn.Parameter标记为可训练参数,随模型一起更新
self.pos_embedding = nn.Parameter(torch.randn(1, max_len,
num_hiddens))
def forward(self, tokens, segments, valid_lens):
"""
前向传播:将输入转化为上下文感知的编码向量
参数:
tokens:词元ID序列,形状(batch_size, seq_len)
segments:片段索引(0/1),形状(batch_size, seq_len)
valid_lens:有效长度,用于屏蔽填充的无效词元,形状(batch_size,)
返回:
X:编码后的向量,形状(batch_size, seq_len, num_hiddens)
"""
# 步骤1:融合词嵌入和片段嵌入
# token_embedding(tokens):(batch_size, seq_len, num_hiddens)
# segment_embedding(segments):(batch_size, seq_len, num_hiddens)
X = self.token_embedding(tokens) + self.segment_embedding(segments)
# 步骤2:添加位置嵌入(截取与输入序列长度匹配的部分)
# pos_embedding.data[:, :X.shape[1], :]:取前seq_len个位置的嵌入
X = X + self.pos_embedding.data[:, :X.shape[1], :]
for blk in self.blks:
X = blk(X, valid_lens)
return X
#@save
class MaskLM(nn.Module):
"""BERT的掩蔽语言模型(Masked Language Model, MLM)任务模块
作用:预测被随机掩盖的词元,使模型学习上下文语义理解能力
"""
def __init__(self, vocab_size, num_hiddens, num_inputs=768, **kwargs):
super(MaskLM, self).__init__(**kwargs)
self.mlp = nn.Sequential(
nn.Linear(num_inputs, num_hiddens),
nn.ReLU(),
nn.LayerNorm(num_hiddens), # 层归一化,稳定训练过程
nn.Linear(num_hiddens, vocab_size)
)
def forward(self, X, pred_positions):
"""
前向传播:从编码器输出中提取需要预测的位置,并用MLP预测原词
参数:
X: BERT编码器的输出,形状为 (batch_size, seq_len, num_hiddens)
pred_positions: 需要预测的位置索引,形状为 (batch_size, num_pred_positions)
返回:
mlm_Y_hat: 预测的词元在词汇表上的概率分布,形状为 (batch_size, num_pred_positions, vocab_size)
"""
# 获取每个样本中需要预测的位置数量
num_pred_positions = pred_positions.shape[1]
# 将预测位置的索引展平为一维(方便后续索引操作)
pred_positions = pred_positions.reshape(-1)
# 获取批量大小
batch_size = X.shape[0]
# 生成批量索引(每个样本要对应num_pred_positions个预测位置)
batch_idx = torch.arange(0, batch_size)
# 重复每个批量索引num_pred_positions次(例如batch_size=2,num_pred_positions=3时,变成[0,0,0,1,1,1])
batch_idx = torch.repeat_interleave(batch_idx, num_pred_positions)
# 根据批量索引和预测位置,从编码器输出X中提取需要预测的词元向量
masked_X = X[batch_idx, pred_positions]
# 调整形状为 (batch_size, num_pred_positions, num_hiddens)
masked_X = masked_X.reshape((batch_size, num_pred_positions, -1))
# 通过MLP预测这些位置的原词,得到词汇表上的概率分布
mlm_Y_hat = self.mlp(masked_X)
return mlm_Y_hat
#@save
class NextSentencePred(nn.Module):
"""BERT的下一句预测任务"""
def __init__(self, num_inputs, **kwargs):
super(NextSentencePred, self).__init__(**kwargs)
self.output = nn.Linear(num_inputs, 2)
def forward(self, X):
# X的形状:(batchsize,num_hiddens)
return self.output(X)
#@save
class BERTModel(nn.Module):
"""BERT模型"""
def __init__(self, vocab_size, num_hiddens, norm_shape, ffn_num_input,
ffn_num_hiddens, num_heads, num_layers, dropout,
max_len=1000, key_size=768, query_size=768, value_size=768,
hid_in_features=768, mlm_in_features=768,
nsp_in_features=768):
super(BERTModel, self).__init__()
self.encoder = BERTEncoder(vocab_size, num_hiddens, norm_shape,
ffn_num_input, ffn_num_hiddens, num_heads, num_layers,
dropout, max_len=max_len, key_size=key_size,
query_size=query_size, value_size=value_size)
self.hidden = nn.Sequential(nn.Linear(hid_in_features, num_hiddens),
nn.Tanh())
self.mlm = MaskLM(vocab_size, num_hiddens, mlm_in_features)
self.nsp = NextSentencePred(nsp_in_features)
def forward(self, tokens, segments, valid_lens=None,
pred_positions=None):
encoded_X = self.encoder(tokens, segments, valid_lens)
if pred_positions is not None:
mlm_Y_hat = self.mlm(encoded_X, pred_positions)
else:
mlm_Y_hat = None
# 用于下一句预测的多层感知机分类器的隐藏层,0是“<cls>”标记的索引
nsp_Y_hat = self.nsp(self.hidden(encoded_X[:, 0, :]))
return encoded_X, mlm_Y_hat, nsp_Y_hat
用于预训练BERT的数据集:
import os
import random
import torch
from d2l import torch as d2l # 导入深度学习工具库
# 注册WikiText-2数据集(指定下载链接和校验码)
d2l.DATA_HUB['wikitext-2'] = (
'https://s3.amazonaws.com/research.metamind.io/wikitext/'
'wikitext-2-v1.zip', '3c914d17d80b1459be871a5039ac23e752a53cbe')
# 读取并预处理WikiText-2数据集
def _read_wiki(data_dir):
file_name = os.path.join(data_dir, 'wiki.train.tokens') # 训练集文件路径
with open(file_name, 'r') as f:
lines = f.readlines() # 逐行读取文本
# 每行转小写、按句号分割为句子,过滤句子数≥2的行
paragraphs = [line.strip().lower().split('. ')
for line in lines if len(line.split('. ')) >= 2]
random.shuffle(paragraphs) # 随机打乱段落顺序
return paragraphs
生成下一次预测任务的数据:
# 生成“下一句预测”任务的样本(50%相邻,50%不相邻)
def _get_next_sentence(sentence, next_sentence, paragraphs):
if random.random() < 0.5: # 50%概率:选相邻句子
is_next = True
else: # 50%概率:从数据中随机选一个句子(不相邻)
next_sentence = random.choice(random.choice(paragraphs))
is_next = False
return sentence, next_sentence, is_next
#@save
def _get_nsp_data_from_paragraph(paragraph, paragraphs, vocab, max_len):
"""从单个段落生成“下一句预测(NSP)”任务的数据"""
nsp_data_from_paragraph = []
# 遍历段落中的句子对(取第i句和第i+1句作为相邻候选)
for i in range(len(paragraph) - 1):
# 生成句子对及“是否相邻”标签(50%概率相邻,50%不相邻)
tokens_a, tokens_b, is_next = _get_next_sentence(
paragraph[i], paragraph[i + 1], paragraphs)
# 计算加入特殊标记后的总长度(<cls> + tokens_a + <sep> + tokens_b + <sep>)
if len(tokens_a) + len(tokens_b) + 3 > max_len:
continue # 超过最大长度则跳过
# 生成词元序列和片段索引(片段索引区分句子A和B)
tokens, segments = d2l.get_tokens_and_segments(tokens_a, tokens_b)
# 保存(词元、片段、是否相邻)的样本
nsp_data_from_paragraph.append((tokens, segments, is_next))
return nsp_data_from_paragraph
生成掩蔽语言模型的数据:
#@save
def _replace_mlm_tokens(tokens, candidate_pred_positions, num_mlm_preds,
vocab):
# 为遮蔽语言模型的输入创建新的词元副本,其中输入可能包含替换的“<mask>”或随机词元
mlm_input_tokens = [token for token in tokens]
pred_positions_and_labels = []
# 打乱后用于在遮蔽语言模型任务中获取15%的随机词元进行预测
random.shuffle(candidate_pred_positions)
for mlm_pred_position in candidate_pred_positions:
if len(pred_positions_and_labels) >= num_mlm_preds:
break
masked_token = None
# 80%的时间:将词替换为“<mask>”词元
if random.random() < 0.8:
masked_token = '<mask>'
else:
# 10%的时间:保持词不变
if random.random() < 0.5:
masked_token = tokens[mlm_pred_position]
# 10%的时间:用随机词替换该词
else:
masked_token = random.choice(vocab.idx_to_token)
mlm_input_tokens[mlm_pred_position] = masked_token
pred_positions_and_labels.append(
(mlm_pred_position, tokens[mlm_pred_position]))
return mlm_input_tokens, pred_positions_and_labels
#@save
def _get_mlm_data_from_tokens(tokens, vocab):
candidate_pred_positions = []
# tokens是一个字符串列表
for i, token in enumerate(tokens):
# 在遮蔽语言模型任务中不会预测特殊词元
if token in ['<cls>', '<sep>']:
continue
candidate_pred_positions.append(i)
# 遮蔽语言模型任务中预测15%的随机词元
num_mlm_preds = max(1, round(len(tokens) * 0.15))
mlm_input_tokens, pred_positions_and_labels = _replace_mlm_tokens(
tokens, candidate_pred_positions, num_mlm_preds, vocab)
pred_positions_and_labels = sorted(pred_positions_and_labels,
key=lambda x: x[0])
pred_positions = [v[0] for v in pred_positions_and_labels]
mlm_pred_labels = [v[1] for v in pred_positions_and_labels]
return vocab[mlm_input_tokens], pred_positions, vocab[mlm_pred_labels]
将文本转化为预训练数据集:
#@save
def _pad_bert_inputs(examples, max_len, vocab):
max_num_mlm_preds = round(max_len * 0.15) # 单样本最多预测的词元数(按序列长度15%计算)
# 初始化批量数据的存储列表
all_token_ids, all_segments, valid_lens = [], [], []
all_pred_positions, all_mlm_weights, all_mlm_labels = [], [], []
nsp_labels = []
# 遍历每个样本,进行填充以统一长度
for (token_ids, pred_positions, mlm_pred_label_ids, segments, is_next) in examples:
# 填充token_ids到max_len(用<pad>的索引填充,保证批量中长度一致)
all_token_ids.append(torch.tensor(
token_ids + [vocab['<pad>']] * (max_len - len(token_ids)),
dtype=torch.long))
# 填充segments到max_len(片段索引,用0填充,0代表句子A的片段)
all_segments.append(torch.tensor(
segments + [0] * (max_len - len(segments)),
dtype=torch.long))
# 记录有效长度(不含<pad>的部分,用于后续过滤填充的影响)
valid_lens.append(torch.tensor(len(token_ids), dtype=torch.float32))
# 填充MLM预测位置到max_num_mlm_preds(用0填充,后续通过权重过滤无效位置)
all_pred_positions.append(torch.tensor(
pred_positions + [0] * (max_num_mlm_preds - len(pred_positions)),
dtype=torch.long))
# 生成MLM权重:有效预测位置权重为1.0,填充位置为0.0(损失计算时忽略填充部分)
all_mlm_weights.append(torch.tensor(
[1.0] * len(mlm_pred_label_ids) + [0.0] * (max_num_mlm_preds - len(pred_positions)),
dtype=torch.float32))
# 填充MLM真实标签:有效标签保留,填充位置用0(权重为0时,填充的标签不影响损失)
all_mlm_labels.append(torch.tensor(
mlm_pred_label_ids + [0] * (max_num_mlm_preds - len(mlm_pred_label_ids)),
dtype=torch.long))
# 记录NSP任务的标签(是否为相邻句子对,True/False转成张量)
nsp_labels.append(torch.tensor(is_next, dtype=torch.long))
# 返回批量数据:token索引、片段、有效长度、MLM预测位置、MLM权重、MLM标签、NSP标签
return (all_token_ids, all_segments, valid_lens, all_pred_positions,
all_mlm_weights, all_mlm_labels, nsp_labels)
#@save
class _WikiTextDataset(torch.utils.data.Dataset):
"""BERT预训练数据集类,继承自PyTorch的Dataset,处理WikiText-2数据"""
def __init__(self, paragraphs, max_len):
# 1. 分词:将段落的句子字符串转换为词元列表
paragraphs = [d2l.tokenize(paragraph, token='word') for paragraph in paragraphs]
# 2. 收集所有句子(用于构建词汇表,覆盖所有可能的词元)
sentences = [sentence for paragraph in paragraphs for sentence in paragraph]
# 3. 构建词汇表:过滤频率<5的低频词,保留<pad>(填充)、<mask>(掩蔽)、<cls>(分类汇总)、<sep>(句子分隔)等特殊标记
self.vocab = d2l.Vocab(sentences, min_freq=5, reserved_tokens=['<pad>', '<mask>', '<cls>', '<sep>'])
# 4. 生成“下一句预测(NSP)”任务的样本:句子对 + 是否相邻的标签
examples = []
for paragraph in paragraphs:
nsp_samples = _get_nsp_data_from_paragraph(paragraph, paragraphs, self.vocab, max_len)
examples.extend(nsp_samples) # 将每个段落的NSP样本加入总列表
# 5. 对每个NSP样本,生成“掩蔽语言模型(MLM)”任务的处理:掩蔽部分词元 + 记录真实标签
examples = [
(_get_mlm_data_from_tokens(tokens, self.vocab) + (segments, is_next))
for tokens, segments, is_next in examples
]
# 6. 填充所有样本到统一长度max_len,并转换为PyTorch张量(适配批量训练)
(self.all_token_ids, self.all_segments, self.valid_lens,
self.all_pred_positions, self.all_mlm_weights,
self.all_mlm_labels, self.nsp_labels) = _pad_bert_inputs(examples, max_len, self.vocab)
def __getitem__(self, idx):
"""按索引idx获取单个样本,返回BERT预训练所需的所有数据(同时支持MLM和NSP任务)"""
return (
self.all_token_ids[idx], # 词元的词汇表索引
self.all_segments[idx], # 片段索引(区分句子A/B)
self.valid_lens[idx], # 有效长度(过滤填充的<pad>)
self.all_pred_positions[idx], # MLM任务中需要预测的词元位置
self.all_mlm_weights[idx], # MLM损失的权重(过滤填充位置)
self.all_mlm_labels[idx], # MLM任务的真实标签(被掩蔽词元的索引)
self.nsp_labels[idx] # NSP任务的标签(是否为相邻句子对)
)
def __len__(self):
"""返回数据集的样本总数(即填充后总样本量)"""
return len(self.all_token_ids)
# 加载WikiText-2数据集,生成BERT预训练的批量数据和词汇表
def load_data_wiki(batch_size, max_len):
num_workers = d2l.get_dataloader_workers() # 获取数据加载进程数(多进程加速)
data_dir = d2l.download_extract('wikitext-2', 'wikitext-2') # 下载并解压数据集
paragraphs = _read_wiki(data_dir) # 读取文本并预处理为段落列表
train_set = _WikiTextDataset(paragraphs, max_len) # 构建BERT预训练数据集
# 生成批量数据加载器(支持批量、打乱、多进程)
train_iter = torch.utils.data.DataLoader(
train_set, batch_size, shuffle=True, num_workers=num_workers)
return train_iter, train_set.vocab # 返回数据迭代器和词汇表
# 超参数:批量大小512,序列最大长度64
batch_size, max_len = 512, 64
train_iter, vocab = load_data_wiki(batch_size, max_len) # 加载数据
for (tokens_X, segments_X, valid_lens_x, pred_positions_X, mlm_weights_X,
mlm_Y, nsp_y) in train_iter:
print(tokens_X.shape, segments_X.shape, valid_lens_x.shape,
pred_positions_X.shape, mlm_weights_X.shape, mlm_Y.shape,
nsp_y.shape)
break
啊啊啊,用课程给的不知道怎么下载不出来,换了别的路径下载:
d2l.DATA_HUB['wikitext-2'] = (
# 新的国内镜像链接(需确保能下载到完整的 wikitext-2-v1.zip)
'https://ascend-repo-modelzoo.obs.cn-east-2.myhuaweicloud.com/MindFormers/dataset/wikitext-2/wikitext-2-v1.zip',
'3c914d17d80b1459be871a5039ac23e752a53cbe'
)
BERT预训练:
import torch
from torch import nn
from d2l import torch as d2l
batch_size, max_len = 512, 64
train_iter, vocab = d2l.load_data_wiki(batch_size, max_len)
# 初始化BERT模型(配置模型结构参数)
net = d2l.BERTModel(len(vocab), num_hiddens=128, norm_shape=[128],
ffn_num_input=128, ffn_num_hiddens=256, num_heads=2,
num_layers=2, dropout=0.2, key_size=128, query_size=128,
value_size=128, hid_in_features=128, mlm_in_features=128,
nsp_in_features=128)
devices = d2l.try_all_gpus()
loss = nn.CrossEntropyLoss()
#@save
def _get_batch_loss_bert(net, loss, vocab_size, tokens_X,
segments_X, valid_lens_X,
pred_positions_X, mlm_weights_X,
mlm_Y, nsp_y):
"""计算BERT在一个批量数据上的损失(同时包含MLM和NSP任务)"""
# 前向传播:获取模型输出(_ 为编码器输出,mlm_Y_hat为MLM预测,nsp_Y_hat为NSP预测)
_, mlm_Y_hat, nsp_Y_hat = net(tokens_X, segments_X,
valid_lens_X.reshape(-1), # 展平有效长度为1维
pred_positions_X) # 传入MLM预测位置
# 计算掩蔽语言模型(MLM)损失
# 调整MLM预测和标签形状以匹配交叉熵输入,并乘以损失权重(过滤填充位置)
mlm_l = loss(mlm_Y_hat.reshape(-1, vocab_size), mlm_Y.reshape(-1)) * \
mlm_weights_X.reshape(-1, 1)
# 对有效损失求和,再除以有效权重总和(+1e-8防止除零)
mlm_l = mlm_l.sum() / (mlm_weights_X.sum() + 1e-8)
nsp_l = loss(nsp_Y_hat, nsp_y)
l = mlm_l + nsp_l
return mlm_l, nsp_l, l
def train_bert(net, train_iter, vocab_size, lr, num_steps, devices):
"""BERT模型预训练主函数"""
# 多GPU并行训练(将模型复制到多个GPU,自动分配数据和计算)
net = nn.DataParallel(net, device_ids=devices).to(devices[0])
trainer = torch.optim.Adam(net.parameters(), lr=lr)
loss = nn.CrossEntropyLoss()
timer = d2l.Timer()
animator = d2l.Animator(xlabel='step', ylabel='loss',
xlim=[1, num_steps], legend=['mlm', 'nsp'])
# 累加器(统计4个指标:MLM总损失、NSP总损失、总样本数、总步数)
metric = d2l.Accumulator(4)
num_steps_reached = False
while not num_steps_reached:
for batch in train_iter:
tokens_X, segments_X, valid_lens_X, pred_positions_X, mlm_weights_X, mlm_Y, nsp_y = [
x.to(devices[0]) for x in batch
]
trainer.zero_grad()
timer.start()
mlm_l, nsp_l, l = _get_batch_loss_bert(
net, loss, vocab_size, tokens_X, segments_X, valid_lens_X,
pred_positions_X, mlm_weights_X, mlm_Y, nsp_y
)
l.backward()
trainer.step()
metric.add(mlm_l, nsp_l, tokens_X.shape[0], 1)
timer.stop()
animator.add(metric[3], (metric[0]/metric[3], metric[1]/metric[3]))
# 检查是否达到总训练步数
if metric[3] == num_steps:
num_steps_reached = True
break
# 4. 输出训练结果
print(f'最终 MLM 损失: {metric[0]/metric[3]:.3f}')
print(f'最终 NSP 损失: {metric[1]/metric[3]:.3f}')
print(f'训练速度: {metric[2]/timer.sum():.1f} 句子对/秒(使用设备: {devices})')
用BERT表示文本:
def get_bert_encoding(net, tokens_a, tokens_b=None):
tokens, segments = d2l.get_tokens_and_segments(tokens_a, tokens_b)
token_ids = torch.tensor(vocab[tokens], device=devices[0]).unsqueeze(0)
segments = torch.tensor(segments, device=devices[0]).unsqueeze(0)
valid_len = torch.tensor(len(tokens), device=devices[0]).unsqueeze(0)
encoded_X, _, _ = net(token_ids, segments, valid_len)
return encoded_X
微调BERT
- BERT 对每一个词元返回抽取了上下文信息的特征向量
- 不同的任务使用不同的特性
句子分类:将对应的向量输入到全连接层分类
命名实体识别:识别一个词元是不是命名实体,例如人名、机构、位置,将非特殊词元放进全连接层分类
问题回答:给定一个问题,和描述文字,找出一个片段作为回答,对片段中的每个词元预测它是不是回答的开头或结束。
自然语言推断与数据集:
import os
import re
import torch
from torch import nn
from d2l import torch as d2l
d2l.DATA_HUB['SNLI'] = (
'https://nlp.stanford.edu/projects/snli/snli_1.0.zip',
'9fcde07509c7e87ec61c64c1b2753d9041758e4')
data_dir = d2l.download_extract('SNLI')
def read_snli(data_dir, is_train):
"""将SNLI数据集解析为“前提列表、假设列表、标签列表”"""
def extract_text(s):
s = re.sub('\(', '', s) # 移除左括号
s = re.sub('\)', '', s) # 移除右括号
s = re.sub('\s{2,}', ' ', s) # 把2个及以上的空格替换为1个
return s.strip() # 移除首尾空格
# 标签映射:文本标签 → 数字标签
label_set = {'entailment': 0, 'contradiction': 1, 'neutral': 2}
file_name = os.path.join(data_dir, 'snli_1.0_train.txt'
if is_train else 'snli_1.0_test.txt')
with open(file_name, 'r') as f:
# 按行读取文件,每行用\t分割(SNLI数据集每行是制表符分隔的列)
# f.readlines()[1:] 跳过第一行(表头)
rows = [row.split('\t') for row in f.readlines()[1:]]
# 筛选出“标签有效”的行,并分别提取前提、假设、标签
# 只保留标签在label_set中的行(过滤无效标签)
premises = [extract_text(row[1]) for row in rows if row[0] in label_set]
hypotheses = [extract_text(row[2]) for row in rows if row[0] in label_set]
labels = [label_set[row[0]] for row in rows if row[0] in label_set]
return premises, hypotheses, labels
class SNLIDataset(torch.utils.data.Dataset):
"""用于加载SNLI数据集的自定义数据集类,继承自PyTorch的Dataset"""
def __init__(self, dataset, num_steps, vocab=None):
"""初始化数据集"""
self.num_steps = num_steps
# 对前提(premise)和假设(hypothesis)的文本进行分词
all_premise_tokens = d2l.tokenize(dataset[0])
all_hypothesis_tokens = d2l.tokenize(dataset[1])
# 构建或使用已有词汇表
if vocab is None:
# 合并前提和假设的分词结果,构建词汇表(过滤低频词,保留<pad>)
self.vocab = d2l.Vocab(all_premise_tokens + all_hypothesis_tokens,
min_freq=5, reserved_tokens=['<pad>'])
else:
self.vocab = vocab
# 对前提和假设的分词结果进行填充/截断,使其长度统一为num_steps
self.premises = self._pad(all_premise_tokens)
self.hypotheses = self._pad(all_hypothesis_tokens)
self.labels = torch.tensor(dataset[2])
print('read ' + str(len(self.premises)) + ' examples') # 打印数据样本数
def _pad(self, lines):
"""对输入的分词序列进行填充或截断,使其长度为num_steps"""
return torch.tensor([d2l.truncate_pad(
self.vocab[line], self.num_steps, self.vocab['<pad>'])
for line in lines])
def __getitem__(self, idx):
"""支持通过索引获取样本:返回(前提张量, 假设张量), 标签"""
return (self.premises[idx], self.hypotheses[idx]), self.labels[idx]
def __len__(self):
"""返回数据集总样本数"""
return len(self.premises)
def load_data_snli(batch_size, num_steps=50):
"""下载并加载SNLI数据集,返回训练/测试数据迭代器和词汇表"""
num_workers = d2l.get_dataloader_workers() # 获取数据加载的进程数(多进程加速)
data_dir = d2l.download_extract('SNLI') # 下载并解压SNLI数据集到本地
train_data = read_snli(data_dir, True) # 读取训练数据
test_data = read_snli(data_dir, False) # 读取测试数据
train_set = SNLIDataset(train_data, num_steps)
test_set = SNLIDataset(test_data, num_steps, train_set.vocab)
# 生成训练数据加载器(打乱数据)
train_iter = torch.utils.data.DataLoader(
train_set, batch_size, shuffle=True, num_workers=num_workers)
# 生成测试数据加载器(不打乱数据)
test_iter = torch.utils.data.DataLoader(
test_set, batch_size, shuffle=False, num_workers=num_workers)
return train_iter, test_iter, train_set.vocab
自然语言推断:微调BERT:
import json
import multiprocessing
import os
import torch
from torch import nn
from d2l import torch as d2l
d2l.DATA_HUB['bert.base'] = (d2l.DATA_URL + 'bert.base.torch.zip',
'225d66f04cae318b841a13d32af3acc165f253ac')
d2l.DATA_HUB['bert.small'] = (d2l.DATA_URL + 'bert.small.torch.zip',
'c72329e68a732bef0452e4b96a1c341c8910f81f')
def load_pretrained_model(pretrained_model, num_hiddens, ffn_num_hiddens,
num_heads, num_layers, dropout, max_len, devices):
data_dir = d2l.download_extract(pretrained_model)
vocab = d2l.Vocab() # 初始化d2l的Vocab类(封装词表操作)
# 从data_dir下的'vocab.json'读取“索引→词元”的列表
vocab.idx_to_token = json.load(open(os.path.join(data_dir, 'vocab.json')))
# 反向生成“词元→索引”的字典(方便通过词元查索引)
vocab.token_to_idx = {token: idx for idx, token in enumerate(vocab.idx_to_token)}
bert = d2l.BERTModel(len(vocab), num_hiddens, norm_shape=[256],
ffn_num_input=256, ffn_num_hiddens=ffn_num_hiddens,
num_heads=4, num_layers=2, dropout=0.2,
max_len=max_len, key_size=256, query_size=256,
value_size=256, hid_in_features=256,
mlm_in_features=256, nsp_in_features=256)
# torch.load 读取本地的预训练参数文件,load_state_dict将参数加载到模型
bert.load_state_dict(torch.load(os.path.join(data_dir, 'pretrained.params'),
weights_only=True))
return bert, vocab
devices = d2l.try_all_gpus()
bert, vocab = load_pretrained_model(
'bert.small', num_hiddens=256, ffn_num_hiddens=512, num_heads=4,
num_layers=2, dropout=0.1, max_len=512, devices=devices)
class SNLIBERTDataset(Dataset):
"""适配 BERT 模型的 SNLI 数据集自定义类,处理“前提-假设”文本对为模型输入格式"""
def __init__(self, dataset, max_len, vocab=None):
"""初始化数据集,完成文本分词与预处理
Args:
dataset: 原始数据集,格式为 (前提句子列表, 假设句子列表, 标签列表)
max_len: 序列最大长度(需预留 [CLS]、[SEP] 等特殊标记的位置)
vocab: 词汇表(若为 None,需外部提前构建或在其他逻辑中处理)
"""
# 对“前提句子”和“假设句子”分别分词(转小写后分词)
all_premise_hypothesis_tokens = [
[p_tokens, h_tokens]
for p_tokens, h_tokens in zip(
*[d2l.tokenize([s.lower() for s in sentences])
for sentences in dataset[:2]]
)
]
self.labels = torch.tensor(dataset[2]) # 标签转为 PyTorch 张量
self.vocab = vocab
self.max_len = max_len
# 预处理生成 BERT 所需的输入:token_ids(词元索引)、segments(片段标识)、valid_lens(有效长度)
(self.all_token_ids, self.all_segments, self.valid_lens) = self._preprocess(all_premise_hypothesis_tokens)
print(f"读取 {len(self.all_token_ids)} 个样本")
def _preprocess(self, all_premise_hypothesis_tokens):
"""多进程预处理所有“前提-假设”对,加速数据处理
Args:
all_premise_hypothesis_tokens: 所有“前提分词列表-假设分词列表”的对
Returns:
tuple: 处理后的 token_ids 张量、segments 张量、valid_lens 张量
"""
pool = multiprocessing.Pool(4) # 启动 4 个进程并行处理
out = pool.map(self._mp_worker, all_premise_hypothesis_tokens) # 多进程映射处理每个样本
# 从多进程结果中拆分出各部分数据
all_token_ids = [token_ids for token_ids, _, _ in out]
all_segments = [segments for _, segments, _ in out]
valid_lens = [valid_len for _, _, valid_len in out]
# 转换为 PyTorch 张量并返回
return (
torch.tensor(all_token_ids, dtype=torch.long),
torch.tensor(all_segments, dtype=torch.long),
torch.tensor(valid_lens)
)
def _mp_worker(self, premise_hypothesis_tokens):
"""单进程工作函数:处理单个“前提-假设”对,生成模型输入
Args:
premise_hypothesis_tokens: 单个样本的 (前提分词列表, 假设分词列表)
Returns:
tuple: token_ids(填充后)、segments(填充后)、valid_len(有效长度)
"""
p_tokens, h_tokens = premise_hypothesis_tokens
# 截断前提/假设,使总长度不超过 max_len - 3(预留 [CLS] 和两个 [SEP])
self._truncate_pair_of_tokens(p_tokens, h_tokens)
# 生成带特殊标记的词元列表 + 片段索引(0=前提,1=假设)
tokens, segments = d2l.get_tokens_and_segments(p_tokens, h_tokens)
# 词元转词汇表索引 + 填充到 max_len(不足用 <pad> 的索引)
token_ids = self.vocab[tokens] + [self.vocab['<pad>']] * (self.max_len - len(tokens))
# 片段索引填充(不足用 0,因为填充部分属于“前提侧”的填充)
segments = segments + [0] * (self.max_len - len(segments))
valid_len = len(tokens) # 有效长度为“未填充的词元数”
return token_ids, segments, valid_len
def _truncate_pair_of_tokens(self, p_tokens, h_tokens):
"""截断前提/假设的分词,确保总长度不超过 max_len - 3
Args:
p_tokens: 前提的分词列表
h_tokens: 假设的分词列表
"""
while len(p_tokens) + len(h_tokens) > self.max_len - 3:
# 策略:谁更长就截断谁的最后一个词
if len(p_tokens) > len(h_tokens):
p_tokens.pop()
else:
h_tokens.pop()
def __getitem__(self, idx):
"""支持通过索引获取样本,返回格式:(token_ids, segments, valid_lens), label
适配 PyTorch DataLoader 的批量读取逻辑
"""
return (self.all_token_ids[idx], self.all_segments[idx], self.valid_lens[idx]), self.labels[idx]
def __len__(self):
"""返回数据集总样本数"""
return len(self.all_token_ids)
batch_size, max_len, num_workers = 512, 128, d2l.get_dataloader_workers()
data_dir = d2l.download_extract('SNLI')
train_set = SNLIBERTDataset(d2l.read_snli(data_dir, True), max_len, vocab)
test_set = SNLIBERTDataset(d2l.read_snli(data_dir, False), max_len, vocab)
train_iter = torch.utils.data.DataLoader(
train_set, batch_size, shuffle=True, num_workers=num_workers)
test_iter = torch.utils.data.DataLoader(
test_set, batch_size, num_workers=num_workers)
class BERTClassifier(nn.Module):
def __init__(self, bert):
"""初始化BERT分类器,基于预训练BERT做“自然语言推理”下游任务
Args:
bert: 预训练好的BERT模型(包含encoder、hidden等组件)
"""
super(BERTClassifier, self).__init__()
self.encoder = bert.encoder
self.hidden = bert.hidden
# 输出层:将BERT的特征(256维)映射到3个类别(SNLI的“蕴含/矛盾/中性”)
self.output = nn.Linear(256, 3)
def forward(self, inputs):
"""前向传播:输入文本对,输出分类预测
Args:
inputs: 元组(tokens_X, segments_X, valid_lens_X),分别对应:
- tokens_X:词元的“词汇表索引”序列(模型输入的基本单位)
- segments_X:“片段标识”(区分“前提句”和“假设句”的词元)
- valid_lens_X:“有效长度”(忽略填充的<pad>部分)
Returns:
模型对每个样本的3分类预测(蕴含/矛盾/中性)
"""
tokens_X, segments_X, valid_lens_X = inputs
encoded_X = self.encoder(tokens_X, segments_X, valid_lens_X)
# 第 1 个token是cls的特征表述,用来放进MLP做预测
# encoded_X[:, 0, :] → 取“批量中每个样本的第0个词元的所有特征”
return self.output(self.hidden(encoded_X[:, 0, :]))
net = BERTClassifier(bert)
lr, num_epochs = 1e-4, 5 # 学习率设为1e-4(微调预训练模型用小学习率),训练5个epoch
trainer = torch.optim.Adam(net.parameters(), lr=lr)
loss = nn.CrossEntropyLoss(reduction='none')
d2l.train_ch13(net, train_iter, test_iter, loss, trainer, num_epochs, devices)
这是关闭多线程还有batch_size改为32的运行结果:
(跑的好慢)
到这里就完结撒花了哈哈哈!!!
更多推荐



所有评论(0)