
高斯混合模型(Gaussian Mixture Model,GMM)聚类算法(python实现)
高斯混合模型(Gaussian Mixture Model,GMM)是一种基于概率模型的聚类方法,它假设数据是由多个高斯分布组成的混合体。以下是 GMM 聚类算法的基本原理和数学公式。
·
原理:
- 目标:将样本集合划分为 K 个高斯分布所表示的聚类,每个聚类对应一个高斯分布。
- 初始化:随机选择 K 个高斯分布作为初始聚类的参数。
- 迭代优化:重复以下步骤,直到收敛:
- E 步骤(Expectation):根据当前的高斯分布参数,计算每个样本属于每个高斯分布的后验概率。
- M 步骤(Maximization):基于样本的后验概率,重新估计每个高斯分布的参数(均值和协方差)。
- 收敛条件:当参数不再发生变化,或者变化很小,算法收敛。
数学公式:
- 高斯分布表示:假设第 k 个高斯分布的参数为 𝜃𝑘=(𝜇𝑘,Σ𝑘)),其中 𝜇𝑘是均值向量,Σ𝑘是协方差矩阵。
- 样本属于高斯分布的后验概率:对于样本
,它属于第 k 个高斯分布的后验概率为:
其中:
是第 k 个高斯分布的先验概率(混合系数),满足
。
是多元高斯分布的概率密度函数,表示样本
在第 k 个高斯分布下的概率密度。
- 参数更新:在 M 步骤中,重新估计每个高斯分布的参数:
- 更新均值
:根据样本的后验概率加权平均计算每个高斯分布的均值。
- 更新协方差
:根据样本的后验概率加权计算每个高斯分布的协方差矩阵。
- 更新混合系数
:根据样本属于每个高斯分布的后验概率计算混合系数。
- 更新均值
优化目标:
GMM 的优化目标是最大化观测数据的似然函数,即最大化观测数据在当前参数下的概率: argmax 其中:
- 𝜃 表示模型的参数集合,包括每个高斯分布的均值、协方差矩阵和混合系数。
是多元高斯分布的概率密度函数。
是混合系数,表示第 k 个高斯分布的先验概率。
GMM 算法通过迭代优化这个似然函数,不断更新模型参数,直到收敛到局部最优解。
实现代码:
代码结合上篇Kmeans使用。
# 文件功能:实现 GMM 算法
import numpy as np
from numpy import *
import pylab
import random,math
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
from scipy.stats import multivariate_normal
import KMeans
class GMM(object):
def __init__(self, n_clusters, max_iter=500,tolerance = 0.00001):
self.n_clusters_ = n_clusters
self.tolerance_ = tolerance
self.max_iter_ = max_iter
# 更新W
self.posteriori_ = None
# 更新pi
self.prior_ = None
# 更新Mu
self.mu_ = None
# 更新Var
self.cov_ = None
def fit(self, data):
# Step1: 通过kmeans初始化GMM的属性
k_means = KMeans.K_Means(self.n_clusters_)
k_means.fit(data)
self.mu_ = np.asarray(k_means.centers_)
print(self.n_clusters_)
self.prior_ = np.asarray(
[1/self.n_clusters_]*self.n_clusters_).reshape(self.n_clusters_,1)
self.posteriori_ = np.zeros((self.n_clusters_,len(data)))
self.cov_ = np.asarray([eye(2,2)]*self.n_clusters_)
# Step2:迭代
Likelihood_value_before = -inf
for i in range(self.max_iter_):
# Step3:E-step 生成每个点的概率密度分布并进行归一化
print("gmm iterator:",i)
for k in range(self.n_clusters_):
self.posteriori_[k] = multivariate_normal.pdf(x=data,mean=self.mu_[k],cov=self.cov_[k])
self.posteriori_ = np.dot(diag(self.prior_.ravel()),self.posteriori_)
self.posteriori_ /= np.sum(self.posteriori_,axis=0)
# Step4:M-step 更新E-step中每个点的生成概率密度分布参数,达到阈值时停止
self.Nk_ = np.sum(self.posteriori_,axis=1)
self.mu_ = np.asarray([np.dot(self.posteriori_[k],data) / self.Nk_[k] for k in range(self.n_clusters_)])
self.cov_ = np.asarray([np.dot((data-self.mu_[k]).T,np.dot(np.diag(self.posteriori_[k].ravel()),data-self.mu_[k]))/self.Nk_[k] for k in range(self.n_clusters_)])
self.prior_ = np.asarray([self.Nk_/self.n_clusters_]).reshape(self.n_clusters_,1)
Likelihood_value_after = np.sum(np.log(self.posteriori_))
print(Likelihood_value_after - Likelihood_value_before)
if np.abs(Likelihood_value_after - Likelihood_value_before) < self.tolerance_ * self.n_clusters_:
break
Likelihood_value_before = np.copy(Likelihood_value_after)
self.fitted = True
def predict(self, data):
result = []
if not self.fitted:
print('Unfitter. ')
return result
result = np.argmax(self.posteriori_, axis=0)
return result
# 生成仿真数据
def generate_X(true_Mu, true_Var):
# 第一簇的数据
num1, mu1, var1 = 400, true_Mu[0], true_Var[0]
X1 = np.random.multivariate_normal(mu1, np.diag(var1), num1)
# 第二簇的数据
num2, mu2, var2 = 600, true_Mu[1], true_Var[1]
X2 = np.random.multivariate_normal(mu2, np.diag(var2), num2)
# 第三簇的数据
num3, mu3, var3 = 1000, true_Mu[2], true_Var[2]
X3 = np.random.multivariate_normal(mu3, np.diag(var3), num3)
# 合并在一起
X = np.vstack((X1, X2, X3))
# 显示数据
plt.figure(figsize=(10, 8))
plt.axis([-10, 15, -5, 15])
plt.scatter(X1[:, 0], X1[:, 1], s=5)
plt.scatter(X2[:, 0], X2[:, 1], s=5)
plt.scatter(X3[:, 0], X3[:, 1], s=5)
plt.show()
return X
if __name__ == '__main__':
# 生成数据
true_Mu = [[0.5, 0.5], [5.5, 2.5], [1, 7]]
true_Var = [[1, 3], [2, 2], [6, 2]]
X = generate_X(true_Mu, true_Var)
gmm = GMM(n_clusters=3)
gmm.fit(X)
cat = gmm.predict(X)
print(cat)
# 初始化
K = 3
# visualize:
color = ['red', 'blue', 'green', 'cyan', 'magenta']
labels = [f'Cluster{k:02d}' for k in range(K)]
cluster = [[] for i in range(K)] # 用于分类所有数据点
for i in range(len(X)):
if cat[i] == 0:
cluster[0].append(X[i])
elif cat[i] == 1:
cluster[1].append(X[i])
elif cat[i] == 2:
cluster[2].append(X[i])
clusters1 = np.asarray(cluster[0])
clusters2 = np.asarray(cluster[1])
clusters3 = np.asarray(cluster[2])
plt.figure(figsize=(10, 8))
plt.axis([-10, 15, -5, 15])
plt.title(u"scatter after gmm")
plt.scatter(clusters1[:, 0], clusters1[:, 1], c="r")
plt.scatter(clusters2[:, 0], clusters2[:, 1], c="b")
plt.scatter(clusters3[:, 0], clusters3[:, 1], c="y")
plt.show()
点击阅读全文
更多推荐
所有评论(0)