别再死记硬背概念了!用Python手把手带你复现粗糙集属性约简(附完整代码)
·
用Python实战粗糙集属性约简:从理论到代码的完整指南
粗糙集理论作为数据挖掘领域的重要工具,其核心价值在于能够从海量数据中发现关键特征,而属性约简则是这一过程的精髓所在。本文将彻底打破传统理论讲解模式,带你用Python一步步实现粗糙集的核心算法,让抽象数学公式转化为可运行的代码。
1. 环境准备与数据建模
在开始编码之前,我们需要搭建合适的Python环境并理解如何用数据结构表示粗糙集的基本概念。推荐使用Anaconda创建专属环境:
conda create -n rough_set python=3.8
conda activate rough_set
pip install pandas numpy matplotlib
决策表 是粗糙集的核心数据结构,我们用pandas DataFrame来表示。以下是一个学生评估的示例数据集:
import pandas as pd
data = {
'语文': ['好', '中', '差', '好', '好', '差', '差', '中'],
'数学': ['中', '好', '中', '好', '中', '差', '好', '好'],
'英语': ['差', '好', '中', '中', '差', '中', '中', '中'],
'理综': ['差', '好', '好', '差', '中', '好', '好', '中'],
'努力程度': ['中', '差', '好', '差', '好', '好', '差', '好']
}
df = pd.DataFrame(data, index=['李华', '大明', 'gogo', '小明', '喜羊羊', '灰太狼', '熊大', '熊二'])
这个DataFrame中,前四列是条件属性,最后一列是决策属性。我们需要实现几个基础函数来处理等价类:
def get_equivalence_classes(df, attributes):
"""获取指定属性集的等价类"""
return df.groupby(attributes).groups.values()
测试这个函数:
print(get_equivalence_classes(df, ['数学', '英语']))
# 输出:[array(['李华', '喜羊羊'], dtype=object),
# array(['大明', '熊大', '熊二'], dtype=object),
# array(['gogo'], dtype=object),
# array(['小明'], dtype=object),
# array(['灰太狼'], dtype=object)]
2. 实现上下近似计算
上下近似是粗糙集的基石概念,下面我们实现这两个关键操作。
2.1 下近似计算
下近似包含那些 确定属于 目标集合的对象:
def lower_approximation(df, condition_attrs, decision_attr, decision_value):
"""
计算下近似
:param condition_attrs: 条件属性列表
:param decision_attr: 决策属性名
:param decision_value: 决策目标值
:return: 下近似集合
"""
# 获取决策属性等于目标值的所有对象
target_objects = set(df[df[decision_attr] == decision_value].index)
# 获取条件属性下的等价类
equivalence_classes = get_equivalence_classes(df, condition_attrs)
lower_approx = set()
for eq_class in equivalence_classes:
if set(eq_class).issubset(target_objects):
lower_approx.update(eq_class)
return lower_approx
2.2 上近似计算
上近似包含那些 可能属于 目标集合的对象:
def upper_approximation(df, condition_attrs, decision_attr, decision_value):
"""
计算上近似
:param condition_attrs: 条件属性列表
:param decision_attr: 决策属性名
:param decision_value: 决策目标值
:return: 上近似集合
"""
target_objects = set(df[df[decision_attr] == decision_value].index)
equivalence_classes = get_equivalence_classes(df, condition_attrs)
upper_approx = set()
for eq_class in equivalence_classes:
if set(eq_class) & target_objects: # 有交集
upper_approx.update(eq_class)
return upper_approx
测试上下近似计算:
print("下近似:", lower_approximation(df, ['数学', '英语'], '努力程度', '好'))
print("上近似:", upper_approximation(df, ['数学', '英语'], '努力程度', '好'))
3. 实现QuickReduct算法
QuickReduct是最经典的属性约简算法之一,其核心思想是贪心地添加最能提升依赖度的属性。
3.1 依赖度计算
首先实现计算属性集依赖度的函数:
def dependency_degree(df, condition_attrs, decision_attr):
"""
计算属性集对决策属性的依赖度
:return: 依赖度值[0,1]
"""
# 获取决策属性的所有唯一值
decision_values = df[decision_attr].unique()
total_objects = len(df)
pos_region = set()
for value in decision_values:
lower_approx = lower_approximation(df, condition_attrs, decision_attr, value)
pos_region.update(lower_approx)
return len(pos_region) / total_objects
3.2 QuickReduct实现
def quick_reduct(df, decision_attr, max_iter=100):
"""
QuickReduct算法实现
:return: 约简后的属性列表
"""
condition_attrs = [col for col in df.columns if col != decision_attr]
reduct = []
dependency = 0.0
for _ in range(max_iter):
max_gain = 0
best_attr = None
for attr in set(condition_attrs) - set(reduct):
current_attrs = reduct + [attr]
current_dep = dependency_degree(df, current_attrs, decision_attr)
gain = current_dep - dependency
if gain > max_gain:
max_gain = gain
best_attr = attr
if best_attr is None:
break
reduct.append(best_attr)
dependency += max_gain
# 检查是否达到完全依赖
if abs(dependency - 1.0) < 1e-6:
break
return reduct
运行算法测试:
reduct = quick_reduct(df, '努力程度')
print("约简结果:", reduct)
4. 算法优化与性能对比
4.1 优化依赖度计算
原始实现每次都要重新计算等价类,效率较低。我们可以引入缓存机制:
from functools import lru_cache
@lru_cache(maxsize=None)
def cached_equivalence_classes(df_hash, attrs_tuple):
"""带缓存的等价类计算"""
return get_equivalence_classes(df, list(attrs_tuple))
4.2 多种算法实现对比
实现ReverseReduct算法作为对比:
def reverse_reduct(df, decision_attr):
"""逆向约简算法"""
condition_attrs = [col for col in df.columns if col != decision_attr]
full_dep = dependency_degree(df, condition_attrs, decision_attr)
for attr in condition_attrs:
temp_attrs = [a for a in condition_attrs if a != attr]
if dependency_degree(df, temp_attrs, decision_attr) == full_dep:
return reverse_reduct(df[temp_attrs + [decision_attr]], decision_attr)
return condition_attrs
4.3 性能对比实验
import time
def test_algorithm(algo_func, df, decision_attr):
start = time.time()
result = algo_func(df.copy(), decision_attr)
elapsed = time.time() - start
return result, elapsed
# 测试不同算法
algorithms = {
"QuickReduct": quick_reduct,
"ReverseReduct": reverse_reduct
}
results = {}
for name, algo in algorithms.items():
reduct, time_used = test_algorithm(algo, df, '努力程度')
results[name] = {
'reduct': reduct,
'time': time_used,
'dependency': dependency_degree(df, reduct, '努力程度')
}
# 展示结果对比
print("算法性能对比:")
for algo, res in results.items():
print(f"{algo}:")
print(f" 约简属性: {res['reduct']}")
print(f" 依赖度: {res['dependency']:.4f}")
print(f" 耗时: {res['time']:.6f}秒")
5. 实际应用与问题排查
5.1 UCI数据集测试
让我们用真实的UCI数据集测试我���的实现:
from sklearn.datasets import load_iris
from sklearn.preprocessing import KBinsDiscretizer
# 加载并离散化数据
iris = load_iris()
X, y = iris.data, iris.target
# 离散化为3个区间
discretizer = KBinsDiscretizer(n_bins=3, encode='ordinal', strategy='uniform')
X_discrete = discretizer.fit_transform(X)
# 创建DataFrame
iris_df = pd.DataFrame(X_discrete, columns=iris.feature_names)
iris_df['target'] = y
# 应用QuickReduct
iris_reduct = quick_reduct(iris_df, 'target')
print("Iris数据集约简结果:", iris_reduct)
5.2 常见问题排查
问题1:算法运行时间过长
- 解决方案:设置最大迭代次数,或实现更高效的依赖度计算
问题2:约简结果不理想
- 检查点:
- 数据离散化是否合理
- 依赖度计算是否正确
- 决策属性是否具有足够区分度
问题3:内存不足
- 优化策略:
- 使用更紧凑的数据结构
- 分批处理大型数据集
- 实现基于生成器的等价类计算
# 内存友好的等价类计算实现
def memory_efficient_equivalence_classes(df, attributes):
"""适用于大型数据集的等价类计算"""
seen = {}
for idx, row in df[attributes].iterrows():
key = tuple(row)
if key not in seen:
seen[key] = []
seen[key].append(idx)
return list(seen.values())
6. 扩展与进阶实现
6.1 变精度粗糙集实现
引入β参数实现更灵活的近似:
def variable_precision_lower_approximation(df, condition_attrs, decision_attr,
decision_value, beta=0.2):
"""
变精度下近似
:param beta: 允许的误差阈值
"""
target_objects = set(df[df[decision_attr] == decision_value].index)
equivalence_classes = get_equivalence_classes(df, condition_attrs)
lower_approx = set()
for eq_class in equivalence_classes:
class_set = set(eq_class)
overlap = class_set & target_objects
error = 1 - len(overlap)/len(class_set) if class_set else 1
if error <= beta:
lower_approx.update(class_set)
return lower_approx
6.2 并行化计算
使用joblib加速依赖度计算:
from joblib import Parallel, delayed
def parallel_dependency_degree(df, condition_attrs, decision_attr):
"""并行计算依赖度"""
decision_values = df[decision_attr].unique()
def compute_lower_approx(value):
return lower_approximation(df, condition_attrs, decision_attr, value)
results = Parallel(n_jobs=-1)(
delayed(compute_lower_approx)(value) for value in decision_values
)
pos_region = set().union(*results)
return len(pos_region) / len(df)
6.3 可视化分析
实现结果可视化帮助理解:
import matplotlib.pyplot as plt
import seaborn as sns
def plot_attribute_dependency(df, decision_attr):
"""绘制各属性对依赖度的贡献"""
condition_attrs = [col for col in df.columns if col != decision_attr]
dependencies = []
for attr in condition_attrs:
dep = dependency_degree(df, [attr], decision_attr)
dependencies.append(dep)
plt.figure(figsize=(10, 6))
sns.barplot(x=condition_attrs, y=dependencies)
plt.title('单个属性对决策的依赖度')
plt.ylabel('依赖度')
plt.ylim(0, 1)
plt.show()
更多推荐



所有评论(0)