隐马尔可夫模型,在语音识别,NLP,生物信息,模式识别等领域被实践证明是有效的算法。

HMM是关于时序的概率模型,描述由一个隐藏的马尔科夫链生成不可观测的状态随机序列,再由各个状态生成观测随机序列的过程。

HMM随机生成的状态随机序列,称为状态序列,每个状态序列生成一个观测,由此产生的观测随机序列,称为观测序列。

HMM由初始概率分布pai,状态转移概率分布A以及观测概率分布B确定。lamda = (A,B,pai)

对于第一个概率计算问题,有两种计算方法,第一种是暴力计算法。

630

第二种算法是前向后向算法,这是解决这个问题的核心算法。

那么怎么求得alphaT呢

可以使用递推。

def calc_alpha(pi, A, B, o, alpha):
    for i in range(4):
        alpha[0][i] = pi[i] + B[i][ord(o[0])]
    T = len(o)
    temp = [0 for i in range(4)]
    del i
    for t in range(1, T):
        for i in range(4):
            for j in range(4):
                temp[j] = (alpha[t-1][j] + A[j][i])
            alpha[t][i] = log_sum(temp)
            alpha[t][i] += B[i][ord(o[t])]

举个例子:

而对于后向算法,其实是把前向算法反过来看。

前向算法是从前往后递推,后向算法是从后往前递推。

def calc_beta(pi, A, B, o, beta):
    T = len(o)
    for i in range(4):
        beta[T-1][i] = 1
    temp = [0 for i in range(4)]
    del i
    for t in range(T-2, -1, -1):
        for i in range(4):
            beta[t][i] = 0
            for j in range(4):
                temp[j] = A[i][j] + B[j][ord(o[t+1])] + beta[t+1][j]
            beta[t][i] += log_sum(temp)

def calc_gamma(alpha, beta, gamma):
    for t in range(len(alpha)):
        for i in range(4):
            gamma[t][i] = alpha[t][i] + beta[t][i]
        s = log_sum(gamma[t])
        for i in range(4):
            gamma[t][i] -= s

从这里要引出我们第三个问题:

def calc_ksi(alpha, beta, A, B, o, ksi):
    T = len(alpha)
    temp = [0 for _ in range(16)]
    for t in range(T-1):
        k = 0
        for i in range(4):
            for j in range(4):
                ksi[t][i][j] = alpha[t][i] + A[i][j] + B[j][ord(o[t+1])] + beta[t+1][j]
                temp[k] = ksi[t][i][j]
                k += 1
        s = log_sum(temp)
        for i in range(4):
            for j in range(4):
                ksi[t][i][j] -= s

这里就是第三个问题,比如我有一串文本,这就是观测序列,比如我要求第三个字他的隐状态,就是求该时刻(在这里为第几个字)所有的隐状态,选择一个概率最大的。这就是所谓的解码。

下面来看第二个问题,就是求模型的问题,要求出初始概率pai,状态转移矩阵A,状态输出矩阵B。

分为两种情况。

第一种情况是监督学习,比如我有很多文本,并且知道每个文字对应的隐状态是BMES,那么应用大数定律来计算参数。

def mle():  # 0B/1M/2E/3S
    pi = [0] * 4   # npi[i]:i状态的个数
    a = [[0] * 4 for _ in range(4)]     # na[i][j]:从i状态到j状态的转移个数
    b = [[0] * 65536 for _ in range(4)]  # nb[i][o]:从i状态到o字符的个数
    f = open(FILE_PATH_chinese, 'rb')  # 训练集
    data = f.read()[3:].decode('utf-8')
    f.close()
    tokens = data.split('  ')
    # 增加英文词训练集
    # f = open(FILE_PATH_english)
    # data = f.read().decode('utf-8')
    # f.close()
    # tokens.extend(data.split(' '))

    # 开始训练
    last_q = 2
    old_progress = 0
    print('进度:')
    for k, token in enumerate(tokens):
        progress = float(k) / float(len(tokens))
        if progress > old_progress + 0.1:
            print('%.3f%%' % (progress * 100))
            old_progress = progress
        token = token.strip()
        n = len(token)
        if n <= 0:
            continue
        if n == 1:
            pi[3] += 1
            a[last_q][3] += 1   # 上一个词的结束(last_q)到当前状态(3S)
            b[3][ord(token[0])] += 1
            last_q = 3
            continue
        # 初始向量
        pi[0] += 1
        pi[2] += 1
        pi[1] += (n-2)
        # 转移矩阵
        a[last_q][0] += 1
        last_q = 2
        if n == 2:
            a[0][2] += 1
        else:
            a[0][1] += 1
            a[1][1] += (n-3)
            a[1][2] += 1
        # 发射矩阵
        b[0][ord(token[0])] += 1
        b[2][ord(token[n-1])] += 1
        for i in range(1, n-1):
            b[1][ord(token[i])] += 1
    # 正则化
    log_normalize(pi)
    for i in range(4):
        log_normalize(a[i])
        log_normalize(b[i])
    return [pi, a, b]

 

第二中方法就是只有观测序列的情况下,参数该怎么确定呢,这里使用EM算法。

 

def bw(pi, A, B, alpha, beta, gamma, ksi, o):
    T = len(alpha)
    for i in range(4):
        pi[i] = gamma[0][i]
    s1 = [0 for _ in range(T-1)]
    s2 = [0 for _ in range(T-1)]
    for i in range(4):
        for j in range(4):
            for t in range(T-1):
                s1[t] = ksi[t][i][j]
                s2[t] = gamma[t][i]
            A[i][j] = log_sum(s1) - log_sum(s2)
    s1 = [0 for _ in range(T)]
    s2 = [0 for _ in range(T)]
    for i in range(4):
        for k in range(65536):
            if k % 5000 == 0:
                print(i, k)
            valid = 0
            for t in range(T):
                if ord(o[t]) == k:
                    s1[valid] = gamma[t][i]
                    valid += 1
                s2[t] = gamma[t][i]
            if valid == 0:
                B[i][k] = -log_sum(s2)  # 平滑
            else:
                B[i][k] = log_sum(s1[:valid]) - log_sum(s2)


def baum_welch(pi, A, B):
    f = open(".\\res\\novel.txt")
    sentence = f.read()[3:].decode('utf-8')  # 跳过文件头
    f.close()
    T = len(sentence)   # 观测序列
    alpha = np.zeros((T, 4))
    beta = np.zeros((T, 4))
    gamma = np.zeros((T, 4))
    ksi = np.zeros((4, 4))
    # alpha = [[0 for i in range(4)] for t in range(T)]
    # beta = [[0 for i in range(4)] for t in range(T)]
    # gamma = [[0 for i in range(4)] for t in range(T)]
    # ksi = [[[0 for j in range(4)] for i in range(4)] for t in range(T-1)]
    for time in range(100):
        print("time:", time)
        calc_alpha(pi, A, B, sentence, alpha)    # alpha(t,i):给定lamda,在时刻t的状态为i且观测到o(1),o(2)...o(t)的概率
        calc_beta(pi, A, B, sentence, beta)      # beta(t,i):给定lamda和时刻t的状态i,观测到o(t+1),o(t+2)...oT的概率
        calc_gamma(alpha, beta, gamma)           # gamma(t,i):给定lamda和O,在时刻t状态位于i的概率
        calc_ksi(alpha, beta, A, B, sentence, ksi)    # ksi(t,i,j):给定lamda和O,在时刻t状态位于i且在时刻i+1,状态位于j的概率
        bw(pi, A, B, alpha, beta, gamma, ksi, sentence)  # baum_welch算法
        save_parameter(pi, A, B, time)

以上便是使用EM算法估计初始概率,状态转移概率,观测转移概率。

下面来具体讨论我们的第三个问题。预测问题

第一种办法是近似算法:

第二种方法是维特比算法,在学习这个算法之前。先来抛砖引玉。

viterbi算法的逻辑和上述算法逻辑大致相同。

def viterbi(pi, A, B, o):
    T = len(o)   # 观测序列
    delta = np.zeros((T, 4))
    #  delta = [[0 for _ in range(4)] for _ in range(T)]
    #  pre = [[0 for _ in range(4)] for _ in range(T)]  # 前一个状态   # pre[t][i]:t时刻的i状态,它的前一个状态是多少
    pre = np.zeros((T, 4))
    for i in range(4):
        delta[0][i] = pi[i] + B[i][ord(o[0])]
    for t in range(1, T):
        for i in range(4):
            delta[t][i] = delta[t-1][0] + A[0][i]
            for j in range(1, 4):
                vj = delta[t-1][j] + A[j][i]
                if delta[t][i] < vj:
                    delta[t][i] = vj
                    pre[t][i] = j
            delta[t][i] += B[i][ord(o[t])]
    decode = [-1 for _ in range(T)]  # 解码:回溯查找最大路径
    q = 0
    for i in range(1, 4):
        if delta[T-1][i] > delta[T-1][q]:
            q = i
    decode[T-1] = int(q)
    for t in range(T-2, -1, -1):
        q = pre[t+1][int(q)]
        decode[t] = q
    return decode

下面再看一个例子:

还是刚刚的例子只不过这次要求解的问题不一样了。上次是求观测概率,现在我们来隐状态链。

在上边的文章中我们对HMM的原理做了详尽的解释。

下面我们来看一下HMM实际中的应用,python需要用到hmmlearn这个包。

1. hmmlearn概述


  hmmlearn安装很简单,"pip install hmmlearn"即可完成。

  hmmlearn实现了三种HMM模型类,按照观测状态是连续状态还是离散状态,可以分为两类。GaussianHMM和GMMHMM是连续观测状态的HMM模型,而MultinomialHMM是离散观测状态的模型,也是我们在HMM原理系列篇里面使用的模型。

  对于MultinomialHMM的模型,使用比较简单,"startprob_"参数对应我们的隐藏状态初始分布Π, "transmat_"对应我们的状态转移矩阵A, "emissionprob_"对应我们的观测状态概率矩阵B。

  对于连续观测状态的HMM模型,GaussianHMM类假设观测状态符合高斯分布,而GMMHMM类则假设观测状态符合混合高斯分布。一般情况下我们使用GaussianHMM即高斯分布的观测状态即可。以下对于连续观测状态的HMM模型,我们只讨论GaussianHMM类。

  在GaussianHMM类中,"startprob_"参数对应我们的隐藏状态初始分布Π, "transmat_"对应我们的状态转移矩阵A, 比较特殊的是观测状态概率的表示方法,此时由于观测状态是连续值,我们无法像MultinomialHMM一样直接给出矩阵B。而是采用给出各个隐藏状态对应的观测状态高斯分布的概率密度函数的参数。

  如果观测序列是一维的,则观测状态的概率密度函数是一维的普通高斯分布。如果观测序列是N维的,则隐藏状态对应的观测状态的概率密度函数是N维高斯分布。高斯分布的概率密度函数参数可以用μ表示高斯分布的期望向量,Σ表示高斯分布的协方差矩阵。在GaussianHMM类中,“means”用来表示各个隐藏状态对应的高斯分布期望向量μ形成的矩阵,而“covars”用来表示各个隐藏状态对应的高斯分布协方差矩阵Σ形成的三维张量。

2. MultinomialHMM实例

      我们使用上述小盒子的例子。

      首先建立hmm模型:

import numpy as np
from hmmlearn import hmm

states = ["box 1", "box 2", "box3"]
n_states = len(states)

observations = ["red", "white"]
n_observations = len(observations)

start_probability = np.array([0.2, 0.4, 0.4])

transition_probability = np.array([
  [0.5, 0.2, 0.3],
  [0.3, 0.5, 0.2],
  [0.2, 0.3, 0.5]
])

emission_probability = np.array([
  [0.5, 0.5],
  [0.4, 0.6],
  [0.7, 0.3]
])

model = hmm.MultinomialHMM(n_components=n_states)
model.startprob_=start_probability
model.transmat_=transition_probability
model.emissionprob_=emission_probability

现在我们来跑一跑HMM问题三维特比算法的解码过程,使用和原理篇一样的观测序列来解码,代码如下:

seen = np.array([[0,1,0]]).T
logprob, box = model.decode(seen, algorithm="viterbi")
print("The ball picked:", ", ".join(map(lambda x: observations[x], seen)))
print("The hidden box", ", ".join(map(lambda x: states[x], box)))

输出结果如下:

('The ball picked:', 'red, white, red')
('The hidden box', 'box3, box3, box3')

可以看出,结果和我们原理篇中的手动计算的结果是一样的。

也可以使用predict函数,结果也是一样的,代码如下:

box2 = model.predict(seen)
print("The ball picked:", ", ".join(map(lambda x: observations[x], seen)))
print("The hidden box", ", ".join(map(lambda x: states[x], box2)))

大家可以跑一下,看看结果是否和decode函数相同。

现在我们再来看看求HMM问题一的观测序列的概率的问题,代码如下:

print model.score(seen)

 

输出结果是:

-2.03854530992

要注意的是score函数返回的是以自然对数为底的对数概率值,我们在HMM问题一中手动计算的结果是未取对数的原始概率是0.13022。对比一下:

ln0.13022≈−2.0385ln0.13022≈−2.0385

现在我们再看看HMM问题二,求解模型参数的问题。由于鲍姆-韦尔奇算法是基于EM算法的近似算法,所以我们需要多跑几次,比如下面我们跑三次,选择一个比较优的模型参数,代码如下:

import numpy as np
from hmmlearn import hmm

states = ["box 1", "box 2", "box3"]
n_states = len(states)

observations = ["red", "white"]
n_observations = len(observations)
model2 = hmm.MultinomialHMM(n_components=n_states, n_iter=20, tol=0.01)
X2 = np.array([[0,1,0,1],[0,0,0,1],[1,0,1,1]])
model2.fit(X2)
print model2.startprob_
print model2.transmat_
print model2.emissionprob_
print model2.score(X2)
model2.fit(X2)
print model2.startprob_
print model2.transmat_
print model2.emissionprob_
print model2.score(X2)
model2.fit(X2)
print model2.startprob_
print model2.transmat_
print model2.emissionprob_
print model2.score(X2)

结果这里就略去了,最终我们会选择分数最高的模型参数。

以上就是用MultinomialHMM解决HMM模型三个问题的方法。

3. GaussianHMM实例

下面我们再给一个GaussianHMM的实例,这个实例中,我们的观测状态是二维的,而隐藏状态有4个。因此我们的“means”参数是4×24×2的矩阵,而“covars”参数是4×2×24×2×2的张量。

 建立模型如下:

startprob = np.array([0.6, 0.3, 0.1, 0.0])
# The transition matrix, note that there are no transitions possible
# between component 1 and 3
transmat = np.array([[0.7, 0.2, 0.0, 0.1],
                     [0.3, 0.5, 0.2, 0.0],
                     [0.0, 0.3, 0.5, 0.2],
                     [0.2, 0.0, 0.2, 0.6]])
# The means of each component
means = np.array([[0.0,  0.0],
                  [0.0, 11.0],
                  [9.0, 10.0],
                  [11.0, -1.0]])
# The covariance of each component
covars = .5 * np.tile(np.identity(2), (4, 1, 1))

# Build an HMM instance and set parameters
model3 = hmm.GaussianHMM(n_components=4, covariance_type="full")

# Instead of fitting it from the data, we directly set the estimated
# parameters, the means and covariance of the components
model3.startprob_ = startprob
model3.transmat_ = transmat
model3.means_ = means
model3.covars_ = covars

注意上面有个参数covariance_type,取值为"full"意味所有的μ,Σμ,Σ都需要指定。取值为“spherical”则ΣΣ的非对角线元素为0,对角线元素相同。取值为“diag”则ΣΣ的非对角线元素为0,对角线元素可以不同,"tied"指所有的隐藏状态对应的观测状态分布使用相同的协方差矩阵ΣΣ

我们现在跑一跑HMM问题一解码的过程,由于观测状态是二维的,我们用的三维观测序列, 所以这里的 输入是一个3×2×23×2×2的张量,代码如下:

seen = np.array([[1.1,2.0],[-1,2.0],[3,7]])
logprob, state = model.decode(seen, algorithm="viterbi")
print state

输出结果如下:

[0 0 1]

再看看HMM问题一对数概率的计算:

print model3.score(seen)

  输出如下:

-41.1211281377

代码路径:https://github.com/HanGaaaaa/MLAPractice/tree/master/HMM

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐