机器学习从零到精通:理论、实践与工业级应用完整指南
机器学习是现代人工智能的核心技术,本文系统介绍了机器学习的数学基础、核心算法和实践应用。首先阐述了机器学习的发展历程和核心价值,包括解决复杂模式识别、预测分析和个性化推荐等问题。然后详细讲解了线性代数、概率统计和优化理论等数学基础,并提供了Python代码实现。重点分析了监督学习(线性回归、逻辑回归、决策树)、集成学习(随机森林、XGBoost)和无监督学习(K-Means、DBSCAN)等核心算
一、引言:机器学习的时代意义
在人工智能浪潮席卷全球的今天,机器学习已成为推动技术革命的核心引擎。根据2025年Gartner技术成熟度曲线报告,机器学习平台已进入"生产力 plateau"阶段,成为企业数字化转型的基础设施。
1.1 机器学习的定义与发展
机器学习(Machine Learning)是人工智能的一个分支,它使计算机系统能够从数据中自动学习和改进,而无需显式编程。Arthur Samuel在1959年首次提出这个概念,定义为"让计算机在没有明确编程的情况下学习能力的研究领域"。
机器学习的发展经历了三个重要阶段:
- 符号主义时代(1950s-1980s):基于规则和逻辑推理
- 统计学习时代(1990s-2010s):基于概率统计和优化理论
- 深度学习时代(2010s-至今):基于神经网络和大数据
1.2 机器学习的核心价值
机器学习解决了传统编程无法处理的三类问题:
- 复杂模式识别:图像识别、语音识别、自然语言处理
- 预测性分析:销售预测、风险评估、股票预测
- 个性化推荐:电商推荐、内容推荐、广告投放
1.3 本文学习路线图
本文将按照以下结构展开:
- 理论基础:数学基础、核心算法原理
- 实践技能:数据预处理、模型训练、评估优化
- 框架应用:Scikit-learn、TensorFlow、PyTorch
- 工业案例:金融风控、医疗诊断、智能推荐
- 高级主题:深度学习、强化学习、AutoML
二、机器学习数学基础
2.1 线性代数
线性代数是机器学习的基石,主要用于数据表示和变换。
2.1.1 向量与矩阵运算
import numpy as np
# 向量创建与运算
vector_a = np.array([1, 2, 3])
vector_b = np.array([4, 5, 6])
# 点积(内积)
dot_product = np.dot(vector_a, vector_b) # 1*4 + 2*5 + 3*6 = 32
# 向量范数
l2_norm = np.linalg.norm(vector_a) # sqrt(1² + 2² + 3²) = sqrt(14)
# 矩阵创建与运算
matrix_A = np.array([[1, 2], [3, 4]])
matrix_B = np.array([[5, 6], [7, 8]])
# 矩阵乘法
matrix_product = np.dot(matrix_A, matrix_B)
# 矩阵转置
matrix_transpose = matrix_A.T
# 矩阵逆
matrix_inverse = np.linalg.inv(matrix_A)
# 特征值分解
eigenvalues, eigenvectors = np.linalg.eig(matrix_A)
2.1.2 特征值与特征向量
特征值分解在PCA(主成分分析)中至关重要:
def pca_from_scratch(X, n_components=2):
"""
从零实现PCA算法
:param X: 输入数据矩阵 (n_samples, n_features)
:param n_components: 保留的主成分数量
:return: 降维后的数据
"""
# 1. 数据标准化
X_mean = np.mean(X, axis=0)
X_std = np.std(X, axis=0)
X_normalized = (X - X_mean) / X_std
# 2. 计算协方差矩阵
cov_matrix = np.cov(X_normalized.T)
# 3. 特征值分解
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
# 4. 按特征值大小排序
idx = np.argsort(eigenvalues)[::-1]
eigenvectors = eigenvectors[:, idx]
# 5. 选择前n_components个特征向量
projection_matrix = eigenvectors[:, :n_components]
# 6. 投影到新空间
X_pca = np.dot(X_normalized, projection_matrix)
return X_pca, projection_matrix
# 使用示例
from sklearn.datasets import load_iris
iris = load_iris()
X_pca, proj_matrix = pca_from_scratch(iris.data, n_components=2)
print(f"PCA降维结果形状: {X_pca.shape}")
2.2 概率论与统计学
2.2.1 贝叶斯定理
贝叶斯定理是朴素贝叶斯分类器的理论基础:
P(A∣B)=P(B∣A)P(A)P(B)P(A∣B)=P(B)P(B∣A)P(A)
class NaiveBayesClassifier:
def __init__(self):
self.classes = None
self.class_priors = {}
self.feature_likelihoods = {}
def fit(self, X, y):
"""训练朴素贝叶斯分类器"""
self.classes = np.unique(y)
n_samples = len(y)
# 计算类先验概率
for class_val in self.classes:
class_samples = X[y == class_val]
self.class_priors[class_val] = len(class_samples) / n_samples
# 计算特征似然(假设高斯分布)
self.feature_likelihoods[class_val] = {
'mean': np.mean(class_samples, axis=0),
'std': np.std(class_samples, axis=0)
}
def _gaussian_probability(self, x, mean, std):
"""计算高斯概率密度"""
exponent = np.exp(-0.5 * ((x - mean) / std) ** 2)
return (1 / (np.sqrt(2 * np.pi) * std)) * exponent
def predict_proba(self, X):
"""预测概率"""
probabilities = []
for x in X:
class_probs = {}
for class_val in self.classes:
# 计算后验概率(忽略分母P(X))
prior = self.class_priors[class_val]
likelihood = np.prod(
self._gaussian_probability(
x,
self.feature_likelihoods[class_val]['mean'],
self.feature_likelihoods[class_val]['std']
)
)
posterior = prior * likelihood
class_probs[class_val] = posterior
# 归一化
total = sum(class_probs.values())
for class_val in class_probs:
class_probs[class_val] /= total
probabilities.append(class_probs)
return probabilities
def predict(self, X):
"""预测类别"""
probabilities = self.predict_proba(X)
predictions = []
for prob in probabilities:
predicted_class = max(prob, key=prob.get)
predictions.append(predicted_class)
return predictions
# 使用示例
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# 生成数据
X, y = make_classification(n_samples=1000, n_features=4, n_classes=3, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 训练模型
nb_classifier = NaiveBayesClassifier()
nb_classifier.fit(X_train, y_train)
# 预测
y_pred = nb_classifier.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"朴素贝叶斯准确率: {accuracy:.4f}")
2.2.2 最大似然估计
最大似然估计(MLE)用于参数估计:
def maximum_likelihood_estimation(data):
"""
计算正态分布的最大似然估计
:param data: 样本数据
:return: 均值和方差的MLE估计
"""
# 对于正态分布,MLE估计为样本均值和样本方差
mu_mle = np.mean(data)
sigma2_mle = np.var(data, ddof=0) # ddof=0表示总体方差
return mu_mle, sigma2_mle
# 验证MLE
np.random.seed(42)
true_mu, true_sigma = 5.0, 2.0
sample_data = np.random.normal(true_mu, true_sigma, 1000)
estimated_mu, estimated_sigma2 = maximum_likelihood_estimation(sample_data)
print(f"真实参数: μ={true_mu}, σ²={true_sigma**2}")
print(f"MLE估计: μ={estimated_mu:.4f}, σ²={estimated_sigma2:.4f}")
2.3 微积分与优化
2.3.1 梯度下降算法
梯度下降是机器学习中最核心的优化算法:
def gradient_descent(X, y, learning_rate=0.01, n_iterations=1000):
"""
实现线性回归的梯度下降算法
:param X: 特征矩阵 (n_samples, n_features)
:param y: 目标向量 (n_samples,)
:param learning_rate: 学习率
:param n_iterations: 迭代次数
:return: 训练好的权重和偏置
"""
n_samples, n_features = X.shape
# 初始化参数
weights = np.zeros(n_features)
bias = 0
# 存储损失历史
loss_history = []
for i in range(n_iterations):
# 前向传播
y_pred = np.dot(X, weights) + bias
# 计算损失(均方误差)
loss = np.mean((y_pred - y) ** 2)
loss_history.append(loss)
# 计算梯度
dw = (2 / n_samples) * np.dot(X.T, (y_pred - y))
db = (2 / n_samples) * np.sum(y_pred - y)
# 更新参数
weights -= learning_rate * dw
bias -= learning_rate * db
# 打印进度
if i % 100 == 0:
print(f"Iteration {i}, Loss: {loss:.6f}")
return weights, bias, loss_history
# 使用示例
from sklearn.datasets import make_regression
# 生成回归数据
X, y = make_regression(n_samples=1000, n_features=3, noise=10, random_state=42)
# 标准化特征
X_mean = np.mean(X, axis=0)
X_std = np.std(X, axis=0)
X_normalized = (X - X_mean) / X_std
# 训练模型
weights, bias, loss_history = gradient_descent(X_normalized, y, learning_rate=0.1, n_iterations=1000)
# 预测
y_pred = np.dot(X_normalized, weights) + bias
mse = np.mean((y_pred - y) ** 2)
print(f"最终MSE: {mse:.4f}")
print(f"学习到的权重: {weights}")
2.3.2 随机梯度下降(SGD)
def stochastic_gradient_descent(X, y, learning_rate=0.01, n_epochs=100):
"""
实现随机梯度下降
"""
n_samples, n_features = X.shape
weights = np.zeros(n_features)
bias = 0
loss_history = []
for epoch in range(n_epochs):
# 随机打乱数据
indices = np.random.permutation(n_samples)
epoch_loss = 0
for i in indices:
# 单个样本的前向传播
y_pred = np.dot(X[i], weights) + bias
# 单个样本的损失
loss = (y_pred - y[i]) ** 2
epoch_loss += loss
# 单个样本的梯度
dw = 2 * (y_pred - y[i]) * X[i]
db = 2 * (y_pred - y[i])
# 更新参数
weights -= learning_rate * dw
bias -= learning_rate * db
avg_loss = epoch_loss / n_samples
loss_history.append(avg_loss)
if epoch % 10 == 0:
print(f"Epoch {epoch}, Average Loss: {avg_loss:.6f}")
return weights, bias, loss_history
三、机器学习核心算法详解
3.1 监督学习算法
3.1.1 线性回归
线性回归是最基础的回归算法,假设特征与目标之间存在线性关系。
数学原理: y=β0+β1x1+β2x2+...+βnxn+ϵy=β0+β1x1+β2x2+...+βnxn+ϵ
正规方程解: β=(XTX)−1XTyβ=(XTX)−1XTy
class LinearRegression:
def __init__(self, fit_intercept=True):
self.fit_intercept = fit_intercept
self.weights = None
self.bias = None
def fit(self, X, y):
"""使用正规方程训练模型"""
if self.fit_intercept:
# 添加偏置列
X_with_bias = np.column_stack([np.ones(X.shape[0]), X])
# 正规方程解
beta = np.linalg.inv(X_with_bias.T @ X_with_bias) @ X_with_bias.T @ y
self.bias = beta[0]
self.weights = beta[1:]
else:
self.weights = np.linalg.inv(X.T @ X) @ X.T @ y
self.bias = 0
def predict(self, X):
"""预测"""
return X @ self.weights + self.bias
def score(self, X, y):
"""计算R²分数"""
y_pred = self.predict(X)
ss_res = np.sum((y - y_pred) ** 2)
ss_tot = np.sum((y - np.mean(y)) ** 2)
return 1 - (ss_res / ss_tot)
# 使用示例
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
# 加载数据(注意:load_boston在新版本sklearn中已被弃用)
# 这里使用make_regression作为替代
X, y = make_regression(n_samples=1000, n_features=10, noise=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练自定义线性回归
lr_custom = LinearRegression()
lr_custom.fit(X_train_scaled, y_train)
# 预测和评估
y_pred_custom = lr_custom.predict(X_test_scaled)
r2_custom = lr_custom.score(X_test_scaled, y_test)
# 与sklearn对比
from sklearn.linear_model import LinearRegression as SklearnLR
lr_sklearn = SklearnLR()
lr_sklearn.fit(X_train_scaled, y_train)
y_pred_sklearn = lr_sklearn.predict(X_test_scaled)
r2_sklearn = lr_sklearn.score(X_test_scaled, y_test)
print(f"自定义线性回归 R²: {r2_custom:.4f}")
print(f"Sklearn线性回归 R²: {r2_sklearn:.4f}")
3.1.2 逻辑回归
逻辑回归用于二分类问题,使用sigmoid函数将线性输出映射到概率。
数学原理: P(y=1∣x)=11+e−(β0+β1x1+...+βnxn)P(y=1∣x)=1+e−(β0+β1x1+...+βnxn)1
损失函数(对数损失): L(β)=−1n∑i=1n[yilog(y^i)+(1−yi)log(1−y^i)]L(β)=−n1∑i=1n[yilog(y^i)+(1−yi)log(1−y^i)]
class LogisticRegression:
def __init__(self, learning_rate=0.01, n_iterations=1000):
self.learning_rate = learning_rate
self.n_iterations = n_iterations
self.weights = None
self.bias = None
def _sigmoid(self, z):
"""sigmoid函数"""
# 防止溢出
z = np.clip(z, -500, 500)
return 1 / (1 + np.exp(-z))
def fit(self, X, y):
"""训练逻辑回归模型"""
n_samples, n_features = X.shape
# 初始化参数
self.weights = np.zeros(n_features)
self.bias = 0
# 梯度下降
for i in range(self.n_iterations):
# 线性输出
linear_output = np.dot(X, self.weights) + self.bias
# 概率预测
y_pred = self._sigmoid(linear_output)
# 计算梯度
dw = (1 / n_samples) * np.dot(X.T, (y_pred - y))
db = (1 / n_samples) * np.sum(y_pred - y)
# 更新参数
self.weights -= self.learning_rate * dw
self.bias -= self.learning_rate * db
# 打印进度
if i % 100 == 0:
loss = self._compute_loss(y, y_pred)
print(f"Iteration {i}, Loss: {loss:.6f}")
def _compute_loss(self, y_true, y_pred):
"""计算对数损失"""
# 防止log(0)
y_pred = np.clip(y_pred, 1e-15, 1 - 1e-15)
return -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
def predict_proba(self, X):
"""预测概率"""
linear_output = np.dot(X, self.weights) + self.bias
return self._sigmoid(linear_output)
def predict(self, X, threshold=0.5):
"""预测类别"""
probabilities = self.predict_proba(X)
return (probabilities >= threshold).astype(int)
def score(self, X, y):
"""计算准确率"""
y_pred = self.predict(X)
return np.mean(y_pred == y)
# 使用示例
from sklearn.datasets import make_classification
# 生成二分类数据
X, y = make_classification(n_samples=1000, n_features=4, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练自定义逻辑回归
log_reg_custom = LogisticRegression(learning_rate=0.1, n_iterations=1000)
log_reg_custom.fit(X_train_scaled, y_train)
# 预测和评估
y_pred_custom = log_reg_custom.predict(X_test_scaled)
accuracy_custom = log_reg_custom.score(X_test_scaled, y_test)
# 与sklearn对比
from sklearn.linear_model import LogisticRegression as SklearnLogReg
log_reg_sklearn = SklearnLogReg()
log_reg_sklearn.fit(X_train_scaled, y_train)
y_pred_sklearn = log_reg_sklearn.predict(X_test_scaled)
accuracy_sklearn = log_reg_sklearn.score(X_test_scaled, y_test)
print(f"自定义逻辑回归准确率: {accuracy_custom:.4f}")
print(f"Sklearn逻辑回归准确率: {accuracy_sklearn:.4f}")
3.1.3 决策树
决策树通过递归分割特征空间来构建分类或回归模型。
信息增益(ID3算法): IG(S,A)=H(S)−∑v∈Values(A)∣Sv∣∣S∣H(Sv)IG(S,A)=H(S)−∑v∈Values(A)∣S∣∣Sv∣H(Sv)
基尼不纯度(CART算法): Gini(S)=1−∑i=1cpi2Gini(S)=1−∑i=1cpi2
class DecisionTreeNode:
def __init__(self, feature_idx=None, threshold=None, left=None, right=None, value=None):
self.feature_idx = feature_idx
self.threshold = threshold
self.left = left
self.right = right
self.value = value
class DecisionTreeClassifier:
def __init__(self, max_depth=5, min_samples_split=2):
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.root = None
def fit(self, X, y):
"""训练决策树"""
self.root = self._build_tree(X, y, depth=0)
def _build_tree(self, X, y, depth):
"""递归构建决策树"""
n_samples, n_features = X.shape
n_classes = len(np.unique(y))
# 停止条件
if (depth >= self.max_depth or
n_samples < self.min_samples_split or
n_classes == 1):
leaf_value = self._most_common_class(y)
return DecisionTreeNode(value=leaf_value)
# 寻找最佳分割
best_feature, best_threshold = self._best_split(X, y, n_features)
if best_feature is None:
leaf_value = self._most_common_class(y)
return DecisionTreeNode(value=leaf_value)
# 分割数据
left_indices = X[:, best_feature] <= best_threshold
right_indices = ~left_indices
# 递归构建子树
left_child = self._build_tree(X[left_indices], y[left_indices], depth + 1)
right_child = self._build_tree(X[right_indices], y[right_indices], depth + 1)
return DecisionTreeNode(best_feature, best_threshold, left_child, right_child)
def _best_split(self, X, y, n_features):
"""寻找最佳分割特征和阈值"""
best_gain = -1
best_feature = None
best_threshold = None
for feature_idx in range(n_features):
thresholds = np.unique(X[:, feature_idx])
for threshold in thresholds:
gain = self._information_gain(y, X[:, feature_idx], threshold)
if gain > best_gain:
best_gain = gain
best_feature = feature_idx
best_threshold = threshold
return best_feature, best_threshold
def _information_gain(self, y, X_column, threshold):
"""计算信息增益"""
parent_entropy = self._entropy(y)
left_indices = X_column <= threshold
right_indices = ~left_indices
if np.sum(left_indices) == 0 or np.sum(right_indices) == 0:
return 0
n = len(y)
n_left, n_right = np.sum(left_indices), np.sum(right_indices)
e_left, e_right = self._entropy(y[left_indices]), self._entropy(y[right_indices])
child_entropy = (n_left / n) * e_left + (n_right / n) * e_right
return parent_entropy - child_entropy
def _entropy(self, y):
"""计算熵"""
if len(y) == 0:
return 0
_, counts = np.unique(y, return_counts=True)
probabilities = counts / len(y)
return -np.sum(probabilities * np.log2(probabilities + 1e-10))
def _most_common_class(self, y):
"""返回最常见的类别"""
unique, counts = np.unique(y, return_counts=True)
return unique[np.argmax(counts)]
def predict(self, X):
"""预测"""
return np.array([self._traverse_tree(x, self.root) for x in X])
def _traverse_tree(self, x, node):
"""遍历决策树"""
if node.value is not None:
return node.value
if x[node.feature_idx] <= node.threshold:
return self._traverse_tree(x, node.left)
else:
return self._traverse_tree(x, node.right)
# 使用示例
from sklearn.datasets import load_iris
# 加载数据
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 训练自定义决策树
dt_custom = DecisionTreeClassifier(max_depth=3, min_samples_split=5)
dt_custom.fit(X_train, y_train)
# 预测和评估
y_pred_custom = dt_custom.predict(X_test)
accuracy_custom = np.mean(y_pred_custom == y_test)
# 与sklearn对比
from sklearn.tree import DecisionTreeClassifier as SklearnDT
dt_sklearn = SklearnDT(max_depth=3, min_samples_split=5)
dt_sklearn.fit(X_train, y_train)
y_pred_sklearn = dt_sklearn.predict(X_test)
accuracy_sklearn = np.mean(y_pred_sklearn == y_test)
print(f"自定义决策树准确率: {accuracy_custom:.4f}")
print(f"Sklearn决策树准确率: {accuracy_sklearn:.4f}")
3.2 集成学习算法
3.2.1 随机森林
随机森林通过集成多个决策树来提高模型的泛化能力。
class RandomForestClassifier:
def __init__(self, n_trees=100, max_depth=10, min_samples_split=2, max_features='sqrt'):
self.n_trees = n_trees
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.max_features = max_features
self.trees = []
def fit(self, X, y):
"""训练随机森林"""
self.trees = []
n_samples, n_features = X.shape
# 确定每次分割使用的特征数量
if self.max_features == 'sqrt':
n_features_to_use = int(np.sqrt(n_features))
elif self.max_features == 'log2':
n_features_to_use = int(np.log2(n_features))
else:
n_features_to_use = n_features
for _ in range(self.n_trees):
# Bootstrap采样
indices = np.random.choice(n_samples, size=n_samples, replace=True)
X_bootstrap = X[indices]
y_bootstrap = y[indices]
# 随机选择特征
feature_indices = np.random.choice(n_features, size=n_features_to_use, replace=False)
X_bootstrap_subset = X_bootstrap[:, feature_indices]
# 训练决策树
tree = DecisionTreeClassifier(
max_depth=self.max_depth,
min_samples_split=self.min_samples_split
)
tree.fit(X_bootstrap_subset, y_bootstrap)
self.trees.append((tree, feature_indices))
def predict(self, X):
"""预测"""
tree_predictions = []
for tree, feature_indices in self.trees:
X_subset = X[:, feature_indices]
tree_pred = tree.predict(X_subset)
tree_predictions.append(tree_pred)
# 投票
tree_predictions = np.array(tree_predictions).T
predictions = []
for sample_preds in tree_predictions:
unique, counts = np.unique(sample_preds, return_counts=True)
predictions.append(unique[np.argmax(counts)])
return np.array(predictions)
# 使用示例
from sklearn.datasets import make_classification
# 生成数据
X, y = make_classification(n_samples=1000, n_features=20, n_classes=3, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 训练自定义随机森林
rf_custom = RandomForestClassifier(n_trees=50, max_depth=5)
rf_custom.fit(X_train, y_train)
# 预测和评估
y_pred_custom = rf_custom.predict(X_test)
accuracy_custom = np.mean(y_pred_custom == y_test)
# 与sklearn对比
from sklearn.ensemble import RandomForestClassifier as SklearnRF
rf_sklearn = SklearnRF(n_estimators=50, max_depth=5, random_state=42)
rf_sklearn.fit(X_train, y_train)
y_pred_sklearn = rf_sklearn.predict(X_test)
accuracy_sklearn = np.mean(y_pred_sklearn == y_test)
print(f"自定义随机森林准确率: {accuracy_custom:.4f}")
print(f"Sklearn随机森林准确率: {accuracy_sklearn:.4f}")
3.2.2 XGBoost
XGBoost是梯度提升树的高效实现,广泛应用于各种机器学习竞赛。
# 使用XGBoost的实际案例
import xgboost as xgb
from sklearn.metrics import accuracy_score, classification_report
# 生成数据
X, y = make_classification(n_samples=10000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 创建DMatrix(XGBoost的数据结构)
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)
# 设置参数
params = {
'objective': 'binary:logistic',
'max_depth': 6,
'learning_rate': 0.1,
'subsample': 0.8,
'colsample_bytree': 0.8,
'eval_metric': 'logloss'
}
# 训练模型
num_rounds = 100
xgb_model = xgb.train(
params,
dtrain,
num_rounds,
evals=[(dtest, 'test')],
early_stopping_rounds=10,
verbose_eval=False
)
# 预测
y_pred_proba = xgb_model.predict(dtest)
y_pred = (y_pred_proba > 0.5).astype(int)
# 评估
accuracy = accuracy_score(y_test, y_pred)
print(f"XGBoost准确率: {accuracy:.4f}")
print("\n分类报告:")
print(classification_report(y_test, y_pred))
3.3 无监督学习算法
3.3.1 K-Means聚类
K-Means通过迭代优化聚类中心来分割数据。
class KMeans:
def __init__(self, k=3, max_iters=100, tol=1e-4):
self.k = k
self.max_iters = max_iters
self.tol = tol
self.centroids = None
self.labels = None
def fit(self, X):
"""训练K-Means模型"""
n_samples, n_features = X.shape
# 随机初始化聚类中心
self.centroids = X[np.random.choice(n_samples, self.k, replace=False)]
for i in range(self.max_iters):
# 分配样本到最近的聚类中心
distances = self._compute_distances(X)
self.labels = np.argmin(distances, axis=1)
# 更新聚类中心
new_centroids = np.array([
X[self.labels == j].mean(axis=0) for j in range(self.k)
])
# 检查收敛
if np.all(np.abs(new_centroids - self.centroids) < self.tol):
print(f"K-Means在第{i+1}次迭代收敛")
break
self.centroids = new_centroids
def _compute_distances(self, X):
"""计算样本到聚类中心的距离"""
distances = np.zeros((X.shape[0], self.k))
for i, centroid in enumerate(self.centroids):
distances[:, i] = np.linalg.norm(X - centroid, axis=1)
return distances
def predict(self, X):
"""预测聚类标签"""
distances = self._compute_distances(X)
return np.argmin(distances, axis=1)
def inertia(self, X):
"""计算簇内平方和"""
total_inertia = 0
for i in range(self.k):
cluster_points = X[self.labels == i]
if len(cluster_points) > 0:
total_inertia += np.sum((cluster_points - self.centroids[i]) ** 2)
return total_inertia
# 使用示例
from sklearn.datasets import make_blobs
import matplotlib.pyplot as plt
# 生成聚类数据
X, y_true = make_blobs(n_samples=300, centers=4, cluster_std=0.60, random_state=42)
# 训练K-Means
kmeans = KMeans(k=4)
kmeans.fit(X)
# 可视化结果
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(X[:, 0], X[:, 1], c=y_true, cmap='viridis')
plt.title('真实聚类')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.subplot(1, 2, 2)
plt.scatter(X[:, 0], X[:, 1], c=kmeans.labels, cmap='viridis')
plt.scatter(kmeans.centroids[:, 0], kmeans.centroids[:, 1],
c='red', marker='x', s=200, linewidths=3)
plt.title('K-Means聚类结果')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.tight_layout()
plt.show()
print(f"K-Means簇内平方和: {kmeans.inertia(X):.2f}")
3.3.2 DBSCAN聚类
DBSCAN基于密度的聚类算法,能够发现任意形状的簇。
from sklearn.neighbors import NearestNeighbors
class DBSCAN:
def __init__(self, eps=0.5, min_samples=5):
self.eps = eps
self.min_samples = min_samples
self.labels = None
def fit(self, X):
"""训练DBSCAN模型"""
n_samples = X.shape[0]
self.labels = np.full(n_samples, -1) # -1表示噪声点
# 找到每个点的邻居
neighbors = self._find_neighbors(X)
cluster_id = 0
for i in range(n_samples):
if self.labels[i] != -1: # 已经被访问过
continue
if len(neighbors[i]) < self.min_samples:
self.labels[i] = -1 # 噪声点
continue
# 扩展簇
self._expand_cluster(X, i, neighbors, cluster_id)
cluster_id += 1
def _find_neighbors(self, X):
"""找到每个点的邻居"""
neighbors = []
for i in range(X.shape[0]):
distances = np.linalg.norm(X - X[i], axis=1)
neighbor_indices = np.where(distances <= self.eps)[0]
neighbors.append(neighbor_indices)
return neighbors
def _expand_cluster(self, X, point_idx, neighbors, cluster_id):
"""扩展簇"""
self.labels[point_idx] = cluster_id
seeds = set(neighbors[point_idx])
while seeds:
current_point = seeds.pop()
if self.labels[current_point] == -1: # 噪声点
self.labels[current_point] = cluster_id
if self.labels[current_point] != -1: # 已经被分配
continue
self.labels[current_point] = cluster_id
if len(neighbors[current_point]) >= self.min_samples:
seeds.update(neighbors[current_point])
# 使用示例
from sklearn.datasets import make_moons
# 生成月牙形数据
X, y_true = make_moons(n_samples=300, noise=0.05, random_state=42)
# 训练DBSCAN
dbscan = DBSCAN
更多推荐
所有评论(0)