别再死记硬背HMM了!用Python手搓一个中文分词器,从BMES标注到Viterbi解码全流程
·
从零构建HMM中文分词器:BMES标注与Viterbi解码实战指南
当处理中文文本时,分词是第一个关键步骤。与英文不同,中文没有天然的空格分隔,这使得分词成为NLP任务中的基础挑战。本文将带你用Python实现一个基于隐马尔可夫模型(HMM)的中文分词器,从理论到代码,一步步揭开统计分词的神秘面纱。
1. 理解BMES标注体系
在HMM分词中,我们采用BMES标注方案对汉字进行标记:
- B (Begin) :表示词的首字
- M (Middle) :表示词的中间字
- E (End) :表示词的尾字
- S (Single) :表示单字词
例如,"人工智能技术"会被标注为:
人/B 工/M 能/E 技/B 术/E
这种标注方案的优势在于:
- 能够清晰表示词的边界
- 适用于不同长度的词语
- 简化了分词任务的建模过程
为什么选择BMES而不是其他标注方案?
- 相比BIOS标注,BMES更简洁
- 四类标签足以覆盖所有中文分词场景
- 实践证明在中文分词任务中效果良好
2. HMM模型的三要素
要实现HMM分词器,我们需要计算三个核心概率矩阵:
2.1 初始状态概率(π)
表示句子第一个字处于各状态的概率:
start_p = {
'B': 0.6, # 句子开头是词首的概率
'M': 0.1,
'E': 0.1,
'S': 0.2 # 句子开头是单字词的概率
}
2.2 状态转移概率(A)
表示从一个状态转移到另一个状态的概率:
trans_p = {
'B': {'B': 0.1, 'M': 0.6, 'E': 0.2, 'S': 0.1},
'M': {'B': 0.1, 'M': 0.5, 'E': 0.3, 'S': 0.1},
# ...其他状态转移概率
}
2.3 发射概率(B)
表示在某个状态下观察到特定汉字的概率:
emit_p = {
'B': {'中': 0.05, '文': 0.03, ...},
'M': {'中': 0.01, '文': 0.02, ...},
# ...其他状态的发射概率
}
3. 从语料库训练HMM参数
我们需要一个已分词的中文语料库来统计这些概率。以下是训练过程的关键步骤:
- 预处理语料 :将分好词的文本转换为BMES标注序列
- 统计频数 :
- 统计每个状态的初始出现次数
- 统计状态间的转移次数
- 统计每个状态下各汉字的出现次数
- 计算概率 :将频数转换为概率,使用加一平滑处理未登录词
def train_hmm(corpus_path):
# 初始化统计字典
start_count = {'B':0, 'M':0, 'E':0, 'S':0}
trans_count = {s: {t:0 for t in 'BMES'} for s in 'BMES'}
emit_count = {s: {} for s in 'BMES'}
state_count = {'B':0, 'M':0, 'E':0, 'S':0}
with open(corpus_path, 'r', encoding='utf-8') as f:
for line in f:
words = line.strip().split()
# 生成BMES标签序列
tags = []
for word in words:
if len(word) == 1:
tags.append('S')
else:
tags.append('B')
tags.extend(['M']*(len(word)-2))
tags.append('E')
# 统计频数
for i, tag in enumerate(tags):
state_count[tag] += 1
if i == 0:
start_count[tag] += 1
else:
trans_count[tags[i-1]][tag] += 1
# 统计发射频数
char = line[i]
emit_count[tag][char] = emit_count[tag].get(char, 0) + 1
# 计算概率(加一平滑)
total_lines = sum(start_count.values())
start_p = {s: (start_count[s]+1)/(total_lines+4) for s in 'BMES'}
trans_p = {}
for s in 'BMES':
trans_p[s] = {}
total = sum(trans_count[s].values())
for t in 'BMES':
trans_p[s][t] = (trans_count[s][t]+1)/(total+4)
emit_p = {}
for s in 'BMES':
emit_p[s] = {}
total = state_count[s]
for char in emit_count[s]:
emit_p[s][char] = (emit_count[s][char]+1)/(total+len(emit_count[s]))
return start_p, trans_p, emit_p
4. 实现Viterbi算法解码
Viterbi算法是HMM分词的核心,它能在O(TN²)时间内找到最可能的状态序列(T为序列长度,N为状态数)。
4.1 Viterbi算法步骤
- 初始化 :计算第一个字在各状态的概率
- 递推 :对于每个后续的字,计算到达每个状态的最大概率路径
- 终止 :找到最后一个字的最可能状态
- 回溯 :从最后一个字回溯,得到完整的状态序列
def viterbi(obs, states, start_p, trans_p, emit_p):
V = [{}] # 存储每个时间步的概率
path = {}
# 初始化
for y in states:
V[0][y] = start_p[y] * emit_p[y].get(obs[0], 1e-10)
path[y] = [y]
# 递推
for t in range(1, len(obs)):
V.append({})
newpath = {}
for y in states:
(prob, state) = max(
(V[t-1][y0] * trans_p[y0].get(y, 1e-10) * emit_p[y].get(obs[t], 1e-10), y0)
for y0 in states
)
V[t][y] = prob
newpath[y] = path[state] + [y]
path = newpath
# 终止
(prob, state) = max((V[-1][y], y) for y in states)
return (prob, path[state])
4.2 处理未登录词
当遇到训练集中未出现的汉字时,我们的模型可能会失效。解决方法包括:
- 使用加一平滑(已经在训练代码中实现)
- 引入汉字部首或笔画数等特征
- 对低频词进行特殊处理
5. 完整分词器实现
现在我们将所有组件整合成一个完整的HMM分词器类:
import pickle
from collections import defaultdict
class HMMSegmenter:
def __init__(self):
self.states = ['B', 'M', 'E', 'S']
self.start_p = {} # 初始概率
self.trans_p = {} # 转移概率
self.emit_p = {} # 发射概率
self.trained = False
def train(self, corpus_path, model_path='hmm_model.pkl'):
# 初始化统计字典
start_count = defaultdict(int)
trans_count = {s: defaultdict(int) for s in self.states}
emit_count = {s: defaultdict(int) for s in self.states}
state_count = defaultdict(int)
# 统计频数
with open(corpus_path, 'r', encoding='utf-8') as f:
line_num = 0
for line in f:
line = line.strip()
if not line:
continue
line_num += 1
words = line.split()
# 生成BMES标签序列
tags = []
for word in words:
if len(word) == 1:
tags.append('S')
else:
tags.append('B')
tags.extend(['M']*(len(word)-2))
tags.append('E')
# 确保标签序列与字符序列长度一致
assert len(tags) == len(''.join(words))
# 统计频数
for i, (char, tag) in enumerate(zip(''.join(words), tags)):
state_count[tag] += 1
if i == 0:
start_count[tag] += 1
else:
trans_count[tags[i-1]][tag] += 1
emit_count[tag][char] += 1
# 计算概率(加一平滑)
self.start_p = {
s: (start_count.get(s, 0)+1)/(line_num+4)
for s in self.states
}
self.trans_p = {
s: {
t: (trans_count[s].get(t, 0)+1)/(sum(trans_count[s].values())+4)
for t in self.states
}
for s in self.states
}
self.emit_p = {
s: {
char: (emit_count[s].get(char, 0)+1)/(state_count.get(s, 1)+len(emit_count[s]))
for char in emit_count[s]
}
for s in self.states
}
# 保存模型
with open(model_path, 'wb') as f:
pickle.dump({
'start_p': self.start_p,
'trans_p': self.trans_p,
'emit_p': self.emit_p
}, f)
self.trained = True
print(f'模型训练完成,已保存到 {model_path}')
def load_model(self, model_path):
with open(model_path, 'rb') as f:
model = pickle.load(f)
self.start_p = model['start_p']
self.trans_p = model['trans_p']
self.emit_p = model['emit_p']
self.trained = True
print('模型加载完成')
def __viterbi(self, text):
V = [{}]
path = {}
# 初始化
for y in self.states:
V[0][y] = self.start_p[y] * self.emit_p[y].get(text[0], 1e-10)
path[y] = [y]
# 递推
for t in range(1, len(text)):
V.append({})
new_path = {}
for y in self.states:
(prob, state) = max(
(V[t-1][y0] * self.trans_p[y0].get(y, 1e-10) * self.emit_p[y].get(text[t], 1e-10), y0)
for y0 in self.states
)
V[t][y] = prob
new_path[y] = path[state] + [y]
path = new_path
# 终止
(prob, state) = max((V[-1][y], y) for y in self.states)
return path[state]
def segment(self, text):
if not self.trained:
raise RuntimeError('请先训练或加载模型')
if not text:
return []
tags = self.__viterbi(text)
# 根据标签序列进行分词
result = []
start = 0
for i, tag in enumerate(tags):
if tag == 'B':
start = i
elif tag == 'E':
result.append(text[start:i+1])
elif tag == 'S':
result.append(text[i])
return result
6. 使用示例与性能优化
6.1 基本使用方法
# 训练模型
segmenter = HMMSegmenter()
segmenter.train('pku_training.utf8', 'my_hmm_model.pkl')
# 或者加载已有模型
# segmenter.load_model('my_hmm_model.pkl')
# 分词
text = "自然语言处理是人工智能的重要方向"
result = segmenter.segment(text)
print('/'.join(result)) # 输出:自然/语言/处理/是/人工/智能/的/重要/方向
6.2 性能优化技巧
- 对数概率 :避免浮点数下溢,使用对数概率计算
- 剪枝 :在Viterbi算法中保留前N个最优路径,而不是全部
- 并行化 :对长文本分段处理
- 模型压缩 :只保留高频字的发射概率
优化后的Viterbi实现(对数版本) :
import math
def viterbi_log(obs, states, start_p, trans_p, emit_p):
V = [{}]
path = {}
# 初始化(取对数)
for y in states:
V[0][y] = math.log(start_p.get(y, 1e-10)) + math.log(emit_p[y].get(obs[0], 1e-10))
path[y] = [y]
# 递推
for t in range(1, len(obs)):
V.append({})
new_path = {}
for y in states:
(prob, state) = max(
(V[t-1][y0] + math.log(trans_p[y0].get(y, 1e-10)) + math.log(emit_p[y].get(obs[t], 1e-10)), y0)
for y0 in states
)
V[t][y] = prob
new_path[y] = path[state] + [y]
path = new_path
# 终止
(prob, state) = max((V[-1][y], y) for y in states)
return path[state]
7. 评估与改进方向
7.1 评估指标
- 准确率(Precision) :正确分出的词数/分词器分出的总词数
- 召回率(Recall) :正确分出的词数/标准答案中的总词数
- F1值 :准确率和召回率的调和平均
7.2 可能的改进方向
- 结合词典 :将基于规则的方法与统计方法结合
- 特征扩展 :考虑汉字的结构特征(偏旁、笔画数等)
- 模型融合 :结合CRF、神经网络等更强大的模型
- 领域适应 :针对特定领域(如医疗、法律)训练专用模型
在实际项目中,HMM分词器虽然简单,但仍然是许多工业级分词系统的基础组件。理解它的原理和实现,能为学习更复杂的NLP模型打下坚实基础。
更多推荐

所有评论(0)