一、引言:机器学习的时代意义

在人工智能浪潮席卷全球的今天,机器学习已成为推动技术革命的核心引擎。根据2025年Gartner技术成熟度曲线报告,机器学习平台已进入"生产力 plateau"阶段,成为企业数字化转型的基础设施。

1.1 机器学习的定义与发展

机器学习(Machine Learning)是人工智能的一个分支,它使计算机系统能够从数据中自动学习和改进,而无需显式编程。Arthur Samuel在1959年首次提出这个概念,定义为"让计算机在没有明确编程的情况下学习能力的研究领域"。

机器学习的发展经历了三个重要阶段:

  • 符号主义时代(1950s-1980s):基于规则和逻辑推理
  • 统计学习时代(1990s-2010s):基于概率统计和优化理论
  • 深度学习时代(2010s-至今):基于神经网络和大数据

1.2 机器学习的核心价值

机器学习解决了传统编程无法处理的三类问题:

  1. 复杂模式识别:图像识别、语音识别、自然语言处理
  2. 预测性分析:销售预测、风险评估、股票预测
  3. 个性化推荐:电商推荐、内容推荐、广告投放

1.3 本文学习路线图

本文将按照以下结构展开:

  • 理论基础:数学基础、核心算法原理
  • 实践技能:数据预处理、模型训练、评估优化
  • 框架应用:Scikit-learn、TensorFlow、PyTorch
  • 工业案例:金融风控、医疗诊断、智能推荐
  • 高级主题:深度学习、强化学习、AutoML

二、机器学习数学基础

2.1 线性代数

线性代数是机器学习的基石,主要用于数据表示和变换。

2.1.1 向量与矩阵运算
import numpy as np

# 向量创建与运算
vector_a = np.array([1, 2, 3])
vector_b = np.array([4, 5, 6])

# 点积(内积)
dot_product = np.dot(vector_a, vector_b)  # 1*4 + 2*5 + 3*6 = 32

# 向量范数
l2_norm = np.linalg.norm(vector_a)  # sqrt(1² + 2² + 3²) = sqrt(14)

# 矩阵创建与运算
matrix_A = np.array([[1, 2], [3, 4]])
matrix_B = np.array([[5, 6], [7, 8]])

# 矩阵乘法
matrix_product = np.dot(matrix_A, matrix_B)

# 矩阵转置
matrix_transpose = matrix_A.T

# 矩阵逆
matrix_inverse = np.linalg.inv(matrix_A)

# 特征值分解
eigenvalues, eigenvectors = np.linalg.eig(matrix_A)
2.1.2 特征值与特征向量

特征值分解在PCA(主成分分析)中至关重要:

def pca_from_scratch(X, n_components=2):
    """
    从零实现PCA算法
    :param X: 输入数据矩阵 (n_samples, n_features)
    :param n_components: 保留的主成分数量
    :return: 降维后的数据
    """
    # 1. 数据标准化
    X_mean = np.mean(X, axis=0)
    X_std = np.std(X, axis=0)
    X_normalized = (X - X_mean) / X_std
    
    # 2. 计算协方差矩阵
    cov_matrix = np.cov(X_normalized.T)
    
    # 3. 特征值分解
    eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
    
    # 4. 按特征值大小排序
    idx = np.argsort(eigenvalues)[::-1]
    eigenvectors = eigenvectors[:, idx]
    
    # 5. 选择前n_components个特征向量
    projection_matrix = eigenvectors[:, :n_components]
    
    # 6. 投影到新空间
    X_pca = np.dot(X_normalized, projection_matrix)
    
    return X_pca, projection_matrix

# 使用示例
from sklearn.datasets import load_iris
iris = load_iris()
X_pca, proj_matrix = pca_from_scratch(iris.data, n_components=2)
print(f"PCA降维结果形状: {X_pca.shape}")

2.2 概率论与统计学

2.2.1 贝叶斯定理

贝叶斯定理是朴素贝叶斯分类器的理论基础:

P(A∣B)=P(B∣A)P(A)P(B)P(A∣B)=P(B)P(B∣A)P(A)​

class NaiveBayesClassifier:
    def __init__(self):
        self.classes = None
        self.class_priors = {}
        self.feature_likelihoods = {}
    
    def fit(self, X, y):
        """训练朴素贝叶斯分类器"""
        self.classes = np.unique(y)
        n_samples = len(y)
        
        # 计算类先验概率
        for class_val in self.classes:
            class_samples = X[y == class_val]
            self.class_priors[class_val] = len(class_samples) / n_samples
            
            # 计算特征似然(假设高斯分布)
            self.feature_likelihoods[class_val] = {
                'mean': np.mean(class_samples, axis=0),
                'std': np.std(class_samples, axis=0)
            }
    
    def _gaussian_probability(self, x, mean, std):
        """计算高斯概率密度"""
        exponent = np.exp(-0.5 * ((x - mean) / std) ** 2)
        return (1 / (np.sqrt(2 * np.pi) * std)) * exponent
    
    def predict_proba(self, X):
        """预测概率"""
        probabilities = []
        
        for x in X:
            class_probs = {}
            for class_val in self.classes:
                # 计算后验概率(忽略分母P(X))
                prior = self.class_priors[class_val]
                likelihood = np.prod(
                    self._gaussian_probability(
                        x, 
                        self.feature_likelihoods[class_val]['mean'],
                        self.feature_likelihoods[class_val]['std']
                    )
                )
                posterior = prior * likelihood
                class_probs[class_val] = posterior
            
            # 归一化
            total = sum(class_probs.values())
            for class_val in class_probs:
                class_probs[class_val] /= total
            
            probabilities.append(class_probs)
        
        return probabilities
    
    def predict(self, X):
        """预测类别"""
        probabilities = self.predict_proba(X)
        predictions = []
        for prob in probabilities:
            predicted_class = max(prob, key=prob.get)
            predictions.append(predicted_class)
        return predictions

# 使用示例
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# 生成数据
X, y = make_classification(n_samples=1000, n_features=4, n_classes=3, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 训练模型
nb_classifier = NaiveBayesClassifier()
nb_classifier.fit(X_train, y_train)

# 预测
y_pred = nb_classifier.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"朴素贝叶斯准确率: {accuracy:.4f}")
2.2.2 最大似然估计

最大似然估计(MLE)用于参数估计:

def maximum_likelihood_estimation(data):
    """
    计算正态分布的最大似然估计
    :param data: 样本数据
    :return: 均值和方差的MLE估计
    """
    # 对于正态分布,MLE估计为样本均值和样本方差
    mu_mle = np.mean(data)
    sigma2_mle = np.var(data, ddof=0)  # ddof=0表示总体方差
    
    return mu_mle, sigma2_mle

# 验证MLE
np.random.seed(42)
true_mu, true_sigma = 5.0, 2.0
sample_data = np.random.normal(true_mu, true_sigma, 1000)

estimated_mu, estimated_sigma2 = maximum_likelihood_estimation(sample_data)
print(f"真实参数: μ={true_mu}, σ²={true_sigma**2}")
print(f"MLE估计: μ={estimated_mu:.4f}, σ²={estimated_sigma2:.4f}")

2.3 微积分与优化

2.3.1 梯度下降算法

梯度下降是机器学习中最核心的优化算法:

def gradient_descent(X, y, learning_rate=0.01, n_iterations=1000):
    """
    实现线性回归的梯度下降算法
    :param X: 特征矩阵 (n_samples, n_features)
    :param y: 目标向量 (n_samples,)
    :param learning_rate: 学习率
    :param n_iterations: 迭代次数
    :return: 训练好的权重和偏置
    """
    n_samples, n_features = X.shape
    
    # 初始化参数
    weights = np.zeros(n_features)
    bias = 0
    
    # 存储损失历史
    loss_history = []
    
    for i in range(n_iterations):
        # 前向传播
        y_pred = np.dot(X, weights) + bias
        
        # 计算损失(均方误差)
        loss = np.mean((y_pred - y) ** 2)
        loss_history.append(loss)
        
        # 计算梯度
        dw = (2 / n_samples) * np.dot(X.T, (y_pred - y))
        db = (2 / n_samples) * np.sum(y_pred - y)
        
        # 更新参数
        weights -= learning_rate * dw
        bias -= learning_rate * db
        
        # 打印进度
        if i % 100 == 0:
            print(f"Iteration {i}, Loss: {loss:.6f}")
    
    return weights, bias, loss_history

# 使用示例
from sklearn.datasets import make_regression

# 生成回归数据
X, y = make_regression(n_samples=1000, n_features=3, noise=10, random_state=42)

# 标准化特征
X_mean = np.mean(X, axis=0)
X_std = np.std(X, axis=0)
X_normalized = (X - X_mean) / X_std

# 训练模型
weights, bias, loss_history = gradient_descent(X_normalized, y, learning_rate=0.1, n_iterations=1000)

# 预测
y_pred = np.dot(X_normalized, weights) + bias
mse = np.mean((y_pred - y) ** 2)
print(f"最终MSE: {mse:.4f}")
print(f"学习到的权重: {weights}")
2.3.2 随机梯度下降(SGD)
def stochastic_gradient_descent(X, y, learning_rate=0.01, n_epochs=100):
    """
    实现随机梯度下降
    """
    n_samples, n_features = X.shape
    weights = np.zeros(n_features)
    bias = 0
    loss_history = []
    
    for epoch in range(n_epochs):
        # 随机打乱数据
        indices = np.random.permutation(n_samples)
        
        epoch_loss = 0
        for i in indices:
            # 单个样本的前向传播
            y_pred = np.dot(X[i], weights) + bias
            
            # 单个样本的损失
            loss = (y_pred - y[i]) ** 2
            epoch_loss += loss
            
            # 单个样本的梯度
            dw = 2 * (y_pred - y[i]) * X[i]
            db = 2 * (y_pred - y[i])
            
            # 更新参数
            weights -= learning_rate * dw
            bias -= learning_rate * db
        
        avg_loss = epoch_loss / n_samples
        loss_history.append(avg_loss)
        
        if epoch % 10 == 0:
            print(f"Epoch {epoch}, Average Loss: {avg_loss:.6f}")
    
    return weights, bias, loss_history

三、机器学习核心算法详解

3.1 监督学习算法

3.1.1 线性回归

线性回归是最基础的回归算法,假设特征与目标之间存在线性关系。

数学原理: y=β0+β1x1+β2x2+...+βnxn+ϵy=β0​+β1​x1​+β2​x2​+...+βn​xn​+ϵ

正规方程解: β=(XTX)−1XTyβ=(XTX)−1XTy

class LinearRegression:
    def __init__(self, fit_intercept=True):
        self.fit_intercept = fit_intercept
        self.weights = None
        self.bias = None
    
    def fit(self, X, y):
        """使用正规方程训练模型"""
        if self.fit_intercept:
            # 添加偏置列
            X_with_bias = np.column_stack([np.ones(X.shape[0]), X])
            # 正规方程解
            beta = np.linalg.inv(X_with_bias.T @ X_with_bias) @ X_with_bias.T @ y
            self.bias = beta[0]
            self.weights = beta[1:]
        else:
            self.weights = np.linalg.inv(X.T @ X) @ X.T @ y
            self.bias = 0
    
    def predict(self, X):
        """预测"""
        return X @ self.weights + self.bias
    
    def score(self, X, y):
        """计算R²分数"""
        y_pred = self.predict(X)
        ss_res = np.sum((y - y_pred) ** 2)
        ss_tot = np.sum((y - np.mean(y)) ** 2)
        return 1 - (ss_res / ss_tot)

# 使用示例
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# 加载数据(注意:load_boston在新版本sklearn中已被弃用)
# 这里使用make_regression作为替代
X, y = make_regression(n_samples=1000, n_features=10, noise=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 训练自定义线性回归
lr_custom = LinearRegression()
lr_custom.fit(X_train_scaled, y_train)

# 预测和评估
y_pred_custom = lr_custom.predict(X_test_scaled)
r2_custom = lr_custom.score(X_test_scaled, y_test)

# 与sklearn对比
from sklearn.linear_model import LinearRegression as SklearnLR
lr_sklearn = SklearnLR()
lr_sklearn.fit(X_train_scaled, y_train)
y_pred_sklearn = lr_sklearn.predict(X_test_scaled)
r2_sklearn = lr_sklearn.score(X_test_scaled, y_test)

print(f"自定义线性回归 R²: {r2_custom:.4f}")
print(f"Sklearn线性回归 R²: {r2_sklearn:.4f}")
3.1.2 逻辑回归

逻辑回归用于二分类问题,使用sigmoid函数将线性输出映射到概率。

数学原理: P(y=1∣x)=11+e−(β0+β1x1+...+βnxn)P(y=1∣x)=1+e−(β0​+β1​x1​+...+βn​xn​)1​

损失函数(对数损失): L(β)=−1n∑i=1n[yilog⁡(y^i)+(1−yi)log⁡(1−y^i)]L(β)=−n1​∑i=1n​[yi​log(y^​i​)+(1−yi​)log(1−y^​i​)]

class LogisticRegression:
    def __init__(self, learning_rate=0.01, n_iterations=1000):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.weights = None
        self.bias = None
    
    def _sigmoid(self, z):
        """sigmoid函数"""
        # 防止溢出
        z = np.clip(z, -500, 500)
        return 1 / (1 + np.exp(-z))
    
    def fit(self, X, y):
        """训练逻辑回归模型"""
        n_samples, n_features = X.shape
        
        # 初始化参数
        self.weights = np.zeros(n_features)
        self.bias = 0
        
        # 梯度下降
        for i in range(self.n_iterations):
            # 线性输出
            linear_output = np.dot(X, self.weights) + self.bias
            
            # 概率预测
            y_pred = self._sigmoid(linear_output)
            
            # 计算梯度
            dw = (1 / n_samples) * np.dot(X.T, (y_pred - y))
            db = (1 / n_samples) * np.sum(y_pred - y)
            
            # 更新参数
            self.weights -= self.learning_rate * dw
            self.bias -= self.learning_rate * db
            
            # 打印进度
            if i % 100 == 0:
                loss = self._compute_loss(y, y_pred)
                print(f"Iteration {i}, Loss: {loss:.6f}")
    
    def _compute_loss(self, y_true, y_pred):
        """计算对数损失"""
        # 防止log(0)
        y_pred = np.clip(y_pred, 1e-15, 1 - 1e-15)
        return -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
    
    def predict_proba(self, X):
        """预测概率"""
        linear_output = np.dot(X, self.weights) + self.bias
        return self._sigmoid(linear_output)
    
    def predict(self, X, threshold=0.5):
        """预测类别"""
        probabilities = self.predict_proba(X)
        return (probabilities >= threshold).astype(int)
    
    def score(self, X, y):
        """计算准确率"""
        y_pred = self.predict(X)
        return np.mean(y_pred == y)

# 使用示例
from sklearn.datasets import make_classification

# 生成二分类数据
X, y = make_classification(n_samples=1000, n_features=4, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 训练自定义逻辑回归
log_reg_custom = LogisticRegression(learning_rate=0.1, n_iterations=1000)
log_reg_custom.fit(X_train_scaled, y_train)

# 预测和评估
y_pred_custom = log_reg_custom.predict(X_test_scaled)
accuracy_custom = log_reg_custom.score(X_test_scaled, y_test)

# 与sklearn对比
from sklearn.linear_model import LogisticRegression as SklearnLogReg
log_reg_sklearn = SklearnLogReg()
log_reg_sklearn.fit(X_train_scaled, y_train)
y_pred_sklearn = log_reg_sklearn.predict(X_test_scaled)
accuracy_sklearn = log_reg_sklearn.score(X_test_scaled, y_test)

print(f"自定义逻辑回归准确率: {accuracy_custom:.4f}")
print(f"Sklearn逻辑回归准确率: {accuracy_sklearn:.4f}")
3.1.3 决策树

决策树通过递归分割特征空间来构建分类或回归模型。

信息增益(ID3算法): IG(S,A)=H(S)−∑v∈Values(A)∣Sv∣∣S∣H(Sv)IG(S,A)=H(S)−∑v∈Values(A)​∣S∣∣Sv​∣​H(Sv​)

基尼不纯度(CART算法): Gini(S)=1−∑i=1cpi2Gini(S)=1−∑i=1c​pi2​

class DecisionTreeNode:
    def __init__(self, feature_idx=None, threshold=None, left=None, right=None, value=None):
        self.feature_idx = feature_idx
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

class DecisionTreeClassifier:
    def __init__(self, max_depth=5, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None
    
    def fit(self, X, y):
        """训练决策树"""
        self.root = self._build_tree(X, y, depth=0)
    
    def _build_tree(self, X, y, depth):
        """递归构建决策树"""
        n_samples, n_features = X.shape
        n_classes = len(np.unique(y))
        
        # 停止条件
        if (depth >= self.max_depth or 
            n_samples < self.min_samples_split or 
            n_classes == 1):
            leaf_value = self._most_common_class(y)
            return DecisionTreeNode(value=leaf_value)
        
        # 寻找最佳分割
        best_feature, best_threshold = self._best_split(X, y, n_features)
        
        if best_feature is None:
            leaf_value = self._most_common_class(y)
            return DecisionTreeNode(value=leaf_value)
        
        # 分割数据
        left_indices = X[:, best_feature] <= best_threshold
        right_indices = ~left_indices
        
        # 递归构建子树
        left_child = self._build_tree(X[left_indices], y[left_indices], depth + 1)
        right_child = self._build_tree(X[right_indices], y[right_indices], depth + 1)
        
        return DecisionTreeNode(best_feature, best_threshold, left_child, right_child)
    
    def _best_split(self, X, y, n_features):
        """寻找最佳分割特征和阈值"""
        best_gain = -1
        best_feature = None
        best_threshold = None
        
        for feature_idx in range(n_features):
            thresholds = np.unique(X[:, feature_idx])
            for threshold in thresholds:
                gain = self._information_gain(y, X[:, feature_idx], threshold)
                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature_idx
                    best_threshold = threshold
        
        return best_feature, best_threshold
    
    def _information_gain(self, y, X_column, threshold):
        """计算信息增益"""
        parent_entropy = self._entropy(y)
        
        left_indices = X_column <= threshold
        right_indices = ~left_indices
        
        if np.sum(left_indices) == 0 or np.sum(right_indices) == 0:
            return 0
        
        n = len(y)
        n_left, n_right = np.sum(left_indices), np.sum(right_indices)
        e_left, e_right = self._entropy(y[left_indices]), self._entropy(y[right_indices])
        child_entropy = (n_left / n) * e_left + (n_right / n) * e_right
        
        return parent_entropy - child_entropy
    
    def _entropy(self, y):
        """计算熵"""
        if len(y) == 0:
            return 0
        _, counts = np.unique(y, return_counts=True)
        probabilities = counts / len(y)
        return -np.sum(probabilities * np.log2(probabilities + 1e-10))
    
    def _most_common_class(self, y):
        """返回最常见的类别"""
        unique, counts = np.unique(y, return_counts=True)
        return unique[np.argmax(counts)]
    
    def predict(self, X):
        """预测"""
        return np.array([self._traverse_tree(x, self.root) for x in X])
    
    def _traverse_tree(self, x, node):
        """遍历决策树"""
        if node.value is not None:
            return node.value
        
        if x[node.feature_idx] <= node.threshold:
            return self._traverse_tree(x, node.left)
        else:
            return self._traverse_tree(x, node.right)

# 使用示例
from sklearn.datasets import load_iris

# 加载数据
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 训练自定义决策树
dt_custom = DecisionTreeClassifier(max_depth=3, min_samples_split=5)
dt_custom.fit(X_train, y_train)

# 预测和评估
y_pred_custom = dt_custom.predict(X_test)
accuracy_custom = np.mean(y_pred_custom == y_test)

# 与sklearn对比
from sklearn.tree import DecisionTreeClassifier as SklearnDT
dt_sklearn = SklearnDT(max_depth=3, min_samples_split=5)
dt_sklearn.fit(X_train, y_train)
y_pred_sklearn = dt_sklearn.predict(X_test)
accuracy_sklearn = np.mean(y_pred_sklearn == y_test)

print(f"自定义决策树准确率: {accuracy_custom:.4f}")
print(f"Sklearn决策树准确率: {accuracy_sklearn:.4f}")

3.2 集成学习算法

3.2.1 随机森林

随机森林通过集成多个决策树来提高模型的泛化能力。

class RandomForestClassifier:
    def __init__(self, n_trees=100, max_depth=10, min_samples_split=2, max_features='sqrt'):
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.max_features = max_features
        self.trees = []
    
    def fit(self, X, y):
        """训练随机森林"""
        self.trees = []
        n_samples, n_features = X.shape
        
        # 确定每次分割使用的特征数量
        if self.max_features == 'sqrt':
            n_features_to_use = int(np.sqrt(n_features))
        elif self.max_features == 'log2':
            n_features_to_use = int(np.log2(n_features))
        else:
            n_features_to_use = n_features
        
        for _ in range(self.n_trees):
            # Bootstrap采样
            indices = np.random.choice(n_samples, size=n_samples, replace=True)
            X_bootstrap = X[indices]
            y_bootstrap = y[indices]
            
            # 随机选择特征
            feature_indices = np.random.choice(n_features, size=n_features_to_use, replace=False)
            X_bootstrap_subset = X_bootstrap[:, feature_indices]
            
            # 训练决策树
            tree = DecisionTreeClassifier(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split
            )
            tree.fit(X_bootstrap_subset, y_bootstrap)
            
            self.trees.append((tree, feature_indices))
    
    def predict(self, X):
        """预测"""
        tree_predictions = []
        for tree, feature_indices in self.trees:
            X_subset = X[:, feature_indices]
            tree_pred = tree.predict(X_subset)
            tree_predictions.append(tree_pred)
        
        # 投票
        tree_predictions = np.array(tree_predictions).T
        predictions = []
        for sample_preds in tree_predictions:
            unique, counts = np.unique(sample_preds, return_counts=True)
            predictions.append(unique[np.argmax(counts)])
        
        return np.array(predictions)

# 使用示例
from sklearn.datasets import make_classification

# 生成数据
X, y = make_classification(n_samples=1000, n_features=20, n_classes=3, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 训练自定义随机森林
rf_custom = RandomForestClassifier(n_trees=50, max_depth=5)
rf_custom.fit(X_train, y_train)

# 预测和评估
y_pred_custom = rf_custom.predict(X_test)
accuracy_custom = np.mean(y_pred_custom == y_test)

# 与sklearn对比
from sklearn.ensemble import RandomForestClassifier as SklearnRF
rf_sklearn = SklearnRF(n_estimators=50, max_depth=5, random_state=42)
rf_sklearn.fit(X_train, y_train)
y_pred_sklearn = rf_sklearn.predict(X_test)
accuracy_sklearn = np.mean(y_pred_sklearn == y_test)

print(f"自定义随机森林准确率: {accuracy_custom:.4f}")
print(f"Sklearn随机森林准确率: {accuracy_sklearn:.4f}")
3.2.2 XGBoost

XGBoost是梯度提升树的高效实现,广泛应用于各种机器学习竞赛。

# 使用XGBoost的实际案例
import xgboost as xgb
from sklearn.metrics import accuracy_score, classification_report

# 生成数据
X, y = make_classification(n_samples=10000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 创建DMatrix(XGBoost的数据结构)
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

# 设置参数
params = {
    'objective': 'binary:logistic',
    'max_depth': 6,
    'learning_rate': 0.1,
    'subsample': 0.8,
    'colsample_bytree': 0.8,
    'eval_metric': 'logloss'
}

# 训练模型
num_rounds = 100
xgb_model = xgb.train(
    params,
    dtrain,
    num_rounds,
    evals=[(dtest, 'test')],
    early_stopping_rounds=10,
    verbose_eval=False
)

# 预测
y_pred_proba = xgb_model.predict(dtest)
y_pred = (y_pred_proba > 0.5).astype(int)

# 评估
accuracy = accuracy_score(y_test, y_pred)
print(f"XGBoost准确率: {accuracy:.4f}")
print("\n分类报告:")
print(classification_report(y_test, y_pred))

3.3 无监督学习算法

3.3.1 K-Means聚类

K-Means通过迭代优化聚类中心来分割数据。

class KMeans:
    def __init__(self, k=3, max_iters=100, tol=1e-4):
        self.k = k
        self.max_iters = max_iters
        self.tol = tol
        self.centroids = None
        self.labels = None
    
    def fit(self, X):
        """训练K-Means模型"""
        n_samples, n_features = X.shape
        
        # 随机初始化聚类中心
        self.centroids = X[np.random.choice(n_samples, self.k, replace=False)]
        
        for i in range(self.max_iters):
            # 分配样本到最近的聚类中心
            distances = self._compute_distances(X)
            self.labels = np.argmin(distances, axis=1)
            
            # 更新聚类中心
            new_centroids = np.array([
                X[self.labels == j].mean(axis=0) for j in range(self.k)
            ])
            
            # 检查收敛
            if np.all(np.abs(new_centroids - self.centroids) < self.tol):
                print(f"K-Means在第{i+1}次迭代收敛")
                break
            
            self.centroids = new_centroids
    
    def _compute_distances(self, X):
        """计算样本到聚类中心的距离"""
        distances = np.zeros((X.shape[0], self.k))
        for i, centroid in enumerate(self.centroids):
            distances[:, i] = np.linalg.norm(X - centroid, axis=1)
        return distances
    
    def predict(self, X):
        """预测聚类标签"""
        distances = self._compute_distances(X)
        return np.argmin(distances, axis=1)
    
    def inertia(self, X):
        """计算簇内平方和"""
        total_inertia = 0
        for i in range(self.k):
            cluster_points = X[self.labels == i]
            if len(cluster_points) > 0:
                total_inertia += np.sum((cluster_points - self.centroids[i]) ** 2)
        return total_inertia

# 使用示例
from sklearn.datasets import make_blobs
import matplotlib.pyplot as plt

# 生成聚类数据
X, y_true = make_blobs(n_samples=300, centers=4, cluster_std=0.60, random_state=42)

# 训练K-Means
kmeans = KMeans(k=4)
kmeans.fit(X)

# 可视化结果
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.scatter(X[:, 0], X[:, 1], c=y_true, cmap='viridis')
plt.title('真实聚类')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')

plt.subplot(1, 2, 2)
plt.scatter(X[:, 0], X[:, 1], c=kmeans.labels, cmap='viridis')
plt.scatter(kmeans.centroids[:, 0], kmeans.centroids[:, 1], 
           c='red', marker='x', s=200, linewidths=3)
plt.title('K-Means聚类结果')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')

plt.tight_layout()
plt.show()

print(f"K-Means簇内平方和: {kmeans.inertia(X):.2f}")
3.3.2 DBSCAN聚类

DBSCAN基于密度的聚类算法,能够发现任意形状的簇。

from sklearn.neighbors import NearestNeighbors

class DBSCAN:
    def __init__(self, eps=0.5, min_samples=5):
        self.eps = eps
        self.min_samples = min_samples
        self.labels = None
    
    def fit(self, X):
        """训练DBSCAN模型"""
        n_samples = X.shape[0]
        self.labels = np.full(n_samples, -1)  # -1表示噪声点
        
        # 找到每个点的邻居
        neighbors = self._find_neighbors(X)
        
        cluster_id = 0
        for i in range(n_samples):
            if self.labels[i] != -1:  # 已经被访问过
                continue
            
            if len(neighbors[i]) < self.min_samples:
                self.labels[i] = -1  # 噪声点
                continue
            
            # 扩展簇
            self._expand_cluster(X, i, neighbors, cluster_id)
            cluster_id += 1
    
    def _find_neighbors(self, X):
        """找到每个点的邻居"""
        neighbors = []
        for i in range(X.shape[0]):
            distances = np.linalg.norm(X - X[i], axis=1)
            neighbor_indices = np.where(distances <= self.eps)[0]
            neighbors.append(neighbor_indices)
        return neighbors
    
    def _expand_cluster(self, X, point_idx, neighbors, cluster_id):
        """扩展簇"""
        self.labels[point_idx] = cluster_id
        seeds = set(neighbors[point_idx])
        
        while seeds:
            current_point = seeds.pop()
            
            if self.labels[current_point] == -1:  # 噪声点
                self.labels[current_point] = cluster_id
            
            if self.labels[current_point] != -1:  # 已经被分配
                continue
            
            self.labels[current_point] = cluster_id
            
            if len(neighbors[current_point]) >= self.min_samples:
                seeds.update(neighbors[current_point])

# 使用示例
from sklearn.datasets import make_moons

# 生成月牙形数据
X, y_true = make_moons(n_samples=300, noise=0.05, random_state=42)

# 训练DBSCAN
dbscan = DBSCAN
Logo

更多推荐