用Python实战粗糙集属性约简:从理论到代码的完整指南

粗糙集理论作为数据挖掘领域的重要工具,其核心价值在于能够从海量数据中发现关键特征,而属性约简则是这一过程的精髓所在。本文将彻底打破传统理论讲解模式,带你用Python一步步实现粗糙集的核心算法,让抽象数学公式转化为可运行的代码。

1. 环境准备与数据建模

在开始编码之前,我们需要搭建合适的Python环境并理解如何用数据结构表示粗糙集的基本概念。推荐使用Anaconda创建专属环境:

conda create -n rough_set python=3.8
conda activate rough_set
pip install pandas numpy matplotlib

决策表 是粗糙集的核心数据结构,我们用pandas DataFrame来表示。以下是一个学生评估的示例数据集:

import pandas as pd

data = {
    '语文': ['好', '中', '差', '好', '好', '差', '差', '中'],
    '数学': ['中', '好', '中', '好', '中', '差', '好', '好'],
    '英语': ['差', '好', '中', '中', '差', '中', '中', '中'],
    '理综': ['差', '好', '好', '差', '中', '好', '好', '中'],
    '努力程度': ['中', '差', '好', '差', '好', '好', '差', '好']
}

df = pd.DataFrame(data, index=['李华', '大明', 'gogo', '小明', '喜羊羊', '灰太狼', '熊大', '熊二'])

这个DataFrame中,前四列是条件属性,最后一列是决策属性。我们需要实现几个基础函数来处理等价类:

def get_equivalence_classes(df, attributes):
    """获取指定属性集的等价类"""
    return df.groupby(attributes).groups.values()

测试这个函数:

print(get_equivalence_classes(df, ['数学', '英语']))
# 输出:[array(['李华', '喜羊羊'], dtype=object), 
#        array(['大明', '熊大', '熊二'], dtype=object), 
#        array(['gogo'], dtype=object), 
#        array(['小明'], dtype=object), 
#        array(['灰太狼'], dtype=object)]

2. 实现上下近似计算

上下近似是粗糙集的基石概念,下面我们实现这两个关键操作。

2.1 下近似计算

下近似包含那些 确定属于 目标集合的对象:

def lower_approximation(df, condition_attrs, decision_attr, decision_value):
    """
    计算下近似
    :param condition_attrs: 条件属性列表
    :param decision_attr: 决策属性名
    :param decision_value: 决策目标值
    :return: 下近似集合
    """
    # 获取决策属性等于目标值的所有对象
    target_objects = set(df[df[decision_attr] == decision_value].index)
    
    # 获取条件属性下的等价类
    equivalence_classes = get_equivalence_classes(df, condition_attrs)
    
    lower_approx = set()
    for eq_class in equivalence_classes:
        if set(eq_class).issubset(target_objects):
            lower_approx.update(eq_class)
    
    return lower_approx

2.2 上近似计算

上近似包含那些 可能属于 目标集合的对象:

def upper_approximation(df, condition_attrs, decision_attr, decision_value):
    """
    计算上近似
    :param condition_attrs: 条件属性列表
    :param decision_attr: 决策属性名
    :param decision_value: 决策目标值
    :return: 上近似集合
    """
    target_objects = set(df[df[decision_attr] == decision_value].index)
    equivalence_classes = get_equivalence_classes(df, condition_attrs)
    
    upper_approx = set()
    for eq_class in equivalence_classes:
        if set(eq_class) & target_objects:  # 有交集
            upper_approx.update(eq_class)
    
    return upper_approx

测试上下近似计算:

print("下近似:", lower_approximation(df, ['数学', '英语'], '努力程度', '好'))
print("上近似:", upper_approximation(df, ['数学', '英语'], '努力程度', '好'))

3. 实现QuickReduct算法

QuickReduct是最经典的属性约简算法之一,其核心思想是贪心地添加最能提升依赖度的属性。

3.1 依赖度计算

首先实现计算属性集依赖度的函数:

def dependency_degree(df, condition_attrs, decision_attr):
    """
    计算属性集对决策属性的依赖度
    :return: 依赖度值[0,1]
    """
    # 获取决策属性的所有唯一值
    decision_values = df[decision_attr].unique()
    
    total_objects = len(df)
    pos_region = set()
    
    for value in decision_values:
        lower_approx = lower_approximation(df, condition_attrs, decision_attr, value)
        pos_region.update(lower_approx)
    
    return len(pos_region) / total_objects

3.2 QuickReduct实现

def quick_reduct(df, decision_attr, max_iter=100):
    """
    QuickReduct算法实现
    :return: 约简后的属性列表
    """
    condition_attrs = [col for col in df.columns if col != decision_attr]
    reduct = []
    dependency = 0.0
    
    for _ in range(max_iter):
        max_gain = 0
        best_attr = None
        
        for attr in set(condition_attrs) - set(reduct):
            current_attrs = reduct + [attr]
            current_dep = dependency_degree(df, current_attrs, decision_attr)
            gain = current_dep - dependency
            
            if gain > max_gain:
                max_gain = gain
                best_attr = attr
        
        if best_attr is None:
            break
            
        reduct.append(best_attr)
        dependency += max_gain
        
        # 检查是否达到完全依赖
        if abs(dependency - 1.0) < 1e-6:
            break
    
    return reduct

运行算法测试:

reduct = quick_reduct(df, '努力程度')
print("约简结果:", reduct)

4. 算法优化与性能对比

4.1 优化依赖度计算

原始实现每次都要重新计算等价类,效率较低。我们可以引入缓存机制:

from functools import lru_cache

@lru_cache(maxsize=None)
def cached_equivalence_classes(df_hash, attrs_tuple):
    """带缓存的等价类计算"""
    return get_equivalence_classes(df, list(attrs_tuple))

4.2 多种算法实现对比

实现ReverseReduct算法作为对比:

def reverse_reduct(df, decision_attr):
    """逆向约简算法"""
    condition_attrs = [col for col in df.columns if col != decision_attr]
    full_dep = dependency_degree(df, condition_attrs, decision_attr)
    
    for attr in condition_attrs:
        temp_attrs = [a for a in condition_attrs if a != attr]
        if dependency_degree(df, temp_attrs, decision_attr) == full_dep:
            return reverse_reduct(df[temp_attrs + [decision_attr]], decision_attr)
    
    return condition_attrs

4.3 性能对比实验

import time

def test_algorithm(algo_func, df, decision_attr):
    start = time.time()
    result = algo_func(df.copy(), decision_attr)
    elapsed = time.time() - start
    return result, elapsed

# 测试不同算法
algorithms = {
    "QuickReduct": quick_reduct,
    "ReverseReduct": reverse_reduct
}

results = {}
for name, algo in algorithms.items():
    reduct, time_used = test_algorithm(algo, df, '努力程度')
    results[name] = {
        'reduct': reduct,
        'time': time_used,
        'dependency': dependency_degree(df, reduct, '努力程度')
    }

# 展示结果对比
print("算法性能对比:")
for algo, res in results.items():
    print(f"{algo}:")
    print(f"  约简属性: {res['reduct']}")
    print(f"  依赖度: {res['dependency']:.4f}")
    print(f"  耗时: {res['time']:.6f}秒")

5. 实际应用与问题排查

5.1 UCI数据集测试

让我们用真实的UCI数据集测试我���的实现:

from sklearn.datasets import load_iris
from sklearn.preprocessing import KBinsDiscretizer

# 加载并离散化数据
iris = load_iris()
X, y = iris.data, iris.target

# 离散化为3个区间
discretizer = KBinsDiscretizer(n_bins=3, encode='ordinal', strategy='uniform')
X_discrete = discretizer.fit_transform(X)

# 创建DataFrame
iris_df = pd.DataFrame(X_discrete, columns=iris.feature_names)
iris_df['target'] = y

# 应用QuickReduct
iris_reduct = quick_reduct(iris_df, 'target')
print("Iris数据集约简结果:", iris_reduct)

5.2 常见问题排查

问题1:算法运行时间过长

  • 解决方案:设置最大迭代次数,或实现更高效的依赖度计算

问题2:约简结果不理想

  • 检查点:
    • 数据离散化是否合理
    • 依赖度计算是否正确
    • 决策属性是否具有足够区分度

问题3:内存不足

  • 优化策略:
    • 使用更紧凑的数据结构
    • 分批处理大型数据集
    • 实现基于生成器的等价类计算
# 内存友好的等价类计算实现
def memory_efficient_equivalence_classes(df, attributes):
    """适用于大型数据集的等价类计算"""
    seen = {}
    for idx, row in df[attributes].iterrows():
        key = tuple(row)
        if key not in seen:
            seen[key] = []
        seen[key].append(idx)
    return list(seen.values())

6. 扩展与进阶实现

6.1 变精度粗糙集实现

引入β参数实现更灵活的近似:

def variable_precision_lower_approximation(df, condition_attrs, decision_attr, 
                                         decision_value, beta=0.2):
    """
    变精度下近似
    :param beta: 允许的误差阈值
    """
    target_objects = set(df[df[decision_attr] == decision_value].index)
    equivalence_classes = get_equivalence_classes(df, condition_attrs)
    
    lower_approx = set()
    for eq_class in equivalence_classes:
        class_set = set(eq_class)
        overlap = class_set & target_objects
        error = 1 - len(overlap)/len(class_set) if class_set else 1
        if error <= beta:
            lower_approx.update(class_set)
    
    return lower_approx

6.2 并行化计算

使用joblib加速依赖度计算:

from joblib import Parallel, delayed

def parallel_dependency_degree(df, condition_attrs, decision_attr):
    """并行计算依赖度"""
    decision_values = df[decision_attr].unique()
    
    def compute_lower_approx(value):
        return lower_approximation(df, condition_attrs, decision_attr, value)
    
    results = Parallel(n_jobs=-1)(
        delayed(compute_lower_approx)(value) for value in decision_values
    )
    
    pos_region = set().union(*results)
    return len(pos_region) / len(df)

6.3 可视化分析

实现结果可视化帮助理解:

import matplotlib.pyplot as plt
import seaborn as sns

def plot_attribute_dependency(df, decision_attr):
    """绘制各属性对依赖度的贡献"""
    condition_attrs = [col for col in df.columns if col != decision_attr]
    dependencies = []
    
    for attr in condition_attrs:
        dep = dependency_degree(df, [attr], decision_attr)
        dependencies.append(dep)
    
    plt.figure(figsize=(10, 6))
    sns.barplot(x=condition_attrs, y=dependencies)
    plt.title('单个属性对决策的依赖度')
    plt.ylabel('依赖度')
    plt.ylim(0, 1)
    plt.show()

更多推荐