spark MLlib简单使用
一、基于MLlib的机器学习MLlib是Spark中提供机器学习函数的库,该库专为集群上并行运行的情况而设计MLlib设计理念:把数据以RDD形式表示,然后在分布式数据集上调用各种算法。归根结底,MLlib就是RDD上一系列可调用的函数的集合。注意:MLlib只包含能够在集群上运行良好的并行算法,包括分布式随机森林算法,K-means,交替最小二乘法等,如果用小规模数据集,单节点用scikit_l
该软件包目前处于维护状态,但它是唯一提供用流数据训练模型的包
一、基于MLlib的机器学习
MLlib是Spark中提供机器学习函数的库,该库专为集群上并行运行的情况而设计。
MLlib三个核心机器学习功能:
数据准备:特征提取、变换、分类特征的散列和一些自然语言处理方法
机器学习方法:实现了一些流行和高级的回归,分类和聚类算法
实用程序:统计方法,如描述性统计、卡方检验、线性代数、模型评估方法等
MLlib设计理念:
把数据以RDD形式表示,然后在分布式数据集上调用各种算法。归根结底,MLlib就是RDD上一系列可调用的函数的集合。
注意:
MLlib只包含能够在集群上运行良好的并行算法,包括分布式随机森林算法,K-means,交替最小二乘法等,如果用小规模数据集,单节点用scikit_learn,Weka更合适。在spark中,可以通过把参数列表传给parallelize(),来在不同节点上分别运行不同的参数,而在每个节点则使用单节点的机器学习库实现。
java:安装scala,安装spark,
单机启动 spark-shell (centos7)
python(window10):
下载pyspark包(直接pip下载失败了,所以复制pip下载时显示的地址下载tar.gz包,再解压,执行python setup.py install,还要安装py4j(0.10.9))
成功:
spark文件(https://www.apache.org/dyn/closer.lua/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz,解压安装包,配置环境变量)
启动spark
import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
comm=os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip')
print ('start spark....',comm)
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())
成功:
例子:垃圾邮件分类 ,该例子来自书《spark快速大数据分析》,执行后发现model = LogisticRegressionWithSGD.train(training_data)报错,因为,因为LogisticRegressionWithSGD已经被舍弃了,将LogisticRegressionWithSGD换成LinearRegression(pyspark.ml.regression包里),train换成fit(type object ‘LinearRegression’ has no attribute ‘train’),还是报错,fit() missing 1 required positional argument: ‘dataset’,只有另寻他路
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.feature import HashingTF
if __name__ == "__main__":
sc = SparkContext(appName="PythonBookExample")
# Load 2 types of emails from text files: spam and ham (non-spam).
# Each line has text from one email.
spam = sc.textFile("files/spam.txt")
ham = sc.textFile("files/ham.txt")
# Create a HashingTF instance to map email text to vectors of 100 features.
tf = HashingTF(numFeatures = 100)
# Each email is split into words, and each word is mapped to one feature.
spamFeatures = spam.map(lambda email: tf.transform(email.split(" ")))
hamFeatures = ham.map(lambda email: tf.transform(email.split(" ")))
# Create LabeledPoint datasets for positive (spam) and negative (ham) examples.
positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1, features))
negativeExamples = hamFeatures.map(lambda features: LabeledPoint(0, features))
training_data = positiveExamples.union(negativeExamples)
training_data.cache() # Cache data since Logistic Regression is an iterative algorithm.
# Run Logistic Regression using the SGD optimizer.
# regParam is model regularization, which can make models more robust.
model = LogisticRegressionWithSGD.train(training_data)
# Test on a positive example (spam) and a negative one (ham).
# First apply the same HashingTF feature transformation used on the training data.
posTestExample = tf.transform("O M G GET cheap stuff by sending money to ...".split(" "))
negTestExample = tf.transform("Hi Dad, I started studying Spark the other ...".split(" "))
# Now use the learned model to predict spam/ham for new emails.
print("Prediction for positive test example: %g" % model.predict(posTestExample))
print("Prediction for negative test example: %g" % model.predict(negTestExample))
sc.stop()
就换了个例子,参考https://blog.csdn.net/sinat_36226553/article/details/104083855
'''
内容:pyspark实现线性回归
版本:spark 2.4.4
'''
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
spark = SparkSession.builder.master('local').appName('LinearRegression').getOrCreate()
sc = spark.sparkContext
data = spark.read.format('csv').option("header", True).load(r"..\..\files\Bike-Sharing-Dataset\day.csv")
# 查看数据模式
data.printSchema()
# 查看主要字段统计信息
data.describe("season","yr").show()
# import pandas as pd
# import seaborn as sns
# import matplotlib.pyplot as plt
# df = pd.read_csv("",header=0)
# sns.set(style="whitegrid",context="notebook")
# cols=["season","yr","temp","atemp","hum","cnt"]
# sns.pairplot(df[cols],size=2.5)
# plt.show()
''' 特征选择并转换类型 '''
data1 = data.select(
data["season"].cast("Double"), # 季节 (类别字段)
data["yr"].cast("Double"), # 年份 (类别字段)
data["mnth"].cast("Double"), # 月份 (类别字段)
data["holiday"].cast("Double"), # 是否为假期 (类别字段)
data["weekday"].cast("Double"), # 是否为周末 (类别字段)
data["workingday"].cast("Double"), # 是否为工作日 (类别字段)
data["weathersit"].cast("Double"), # 天气 (类别字段)
data["temp"].cast("Double"), # 气温 (数值字段)
data["atemp"].cast("Double"), # 体感温度 (数值字段)
data["hum"].cast("Double"), # 湿度 (数值字段)
data["windspeed"].cast("Double"), # 风速 (数值字段)
data["cnt"].cast("Double").alias("label")) # 单车租用量 (数值字段)
# data1.show(10)
''' 对类别字段或特征转换为二元向量 '''
data2 = OneHotEncoder().setInputCol("season").setOutputCol("seasonVec")
data3 = OneHotEncoder().setInputCol("yr").setOutputCol("yrVec")
data4 = OneHotEncoder().setInputCol("mnth").setOutputCol("mnthVec")
data5 = OneHotEncoder().setInputCol("holiday").setOutputCol("holidayVec")
data6 = OneHotEncoder().setInputCol("weekday").setOutputCol("weekdayVec")
data7 = OneHotEncoder().setInputCol("workingday").setOutputCol("workingdayVec")
data8 = OneHotEncoder().setInputCol("weathersit").setOutputCol("weathersitVec")
''' 由于OneHotEncoder不是Estimator,建立一个流水线,把以上转换组装到这个流水线上 '''
pipeline_en = Pipeline().setStages([data2, data3, data4, data5, data6, data7, data8])
data_lr = pipeline_en.fit(data1).transform(data1)
# 把原来的4个及转换后的7个二元特征向量,拼接成一个feature向量。
featureArray = ["seasonVec", "yrVec", "mnthVec", "holidayVec", "weekdayVec", "workingdayVec", "weathersitVec", "temp",
"atemp", "hum", "windspeed"]
assembler_lr = VectorAssembler().setInputCols(featureArray).setOutputCol("features_lr")
''' 对data2数据集进行随机划分,这份数据用于回归模型 '''
# randomSplit(weigh , *seed)
trainingData_lr, testData_lr = data_lr.randomSplit([0.7, 0.3])
''' 设置线性回归模型的参数 '''
# regParam 决定了目标函数中正则化项的权重
# elasticNetParam():正则化L1和L2的线性组合 0表示只有L2 1表示只有L1
lr = LinearRegression().setFeaturesCol("features_lr").setLabelCol("label").setFitIntercept(True).setMaxIter(
20).setRegParam(0.3).setElasticNetParam(0.8)
# 把线性回归模型涉及的特征转换、模型训练组装载一个流水上线。
pipeline_lr = Pipeline().setStages([assembler_lr, lr])
# 训练线性回归模型
lrModel = pipeline_lr.fit(trainingData_lr)
# 预测线性回归模型的值
predictions_lr = lrModel.transform(testData_lr)
# RegressionEvaluator.setMetricName定义四种评估器:rmse(默认)、 mse、r2(拟合优度检验)、mae(平均绝对误差)。
evaluator = RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
# 模型评估指标
rmse_lr = evaluator.evaluate(predictions_lr)
print("rmse_lr:", rmse_lr)
代码能够运行,结果如下:
说明pyspark现在能正常运行了,yep
下面代码是书上这个例子java代码,(由于环境,未尝试过)
package com.oreilly.learningsparkexamples.java;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
import org.apache.spark.mllib.feature.HashingTF;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.regression.LabeledPoint;
public final class MLlib {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaBookExample");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// Load 2 types of emails from text files: spam and ham (non-spam).
// Each line has text from one email.
JavaRDD<String> spam = sc.textFile("files/spam.txt");
JavaRDD<String> ham = sc.textFile("files/ham.txt");
// Create a HashingTF instance to map email text to vectors of 100 features.
final HashingTF tf = new HashingTF(100);
// Each email is split into words, and each word is mapped to one feature.
// Create LabeledPoint datasets for positive (spam) and negative (ham) examples.
JavaRDD<LabeledPoint> positiveExamples = spam.map(new Function<String, LabeledPoint>() {
@Override public LabeledPoint call(String email) {
return new LabeledPoint(1, tf.transform(Arrays.asList(email.split(" "))));
}
});
JavaRDD<LabeledPoint> negativeExamples = ham.map(new Function<String, LabeledPoint>() {
@Override public LabeledPoint call(String email) {
return new LabeledPoint(0, tf.transform(Arrays.asList(email.split(" "))));
}
});
JavaRDD<LabeledPoint> trainingData = positiveExamples.union(negativeExamples);
trainingData.cache(); // Cache data since Logistic Regression is an iterative algorithm.
// Create a Logistic Regression learner which uses the LBFGS optimizer.
LogisticRegressionWithSGD lrLearner = new LogisticRegressionWithSGD();
// Run the actual learning algorithm on the training data.
LogisticRegressionModel model = lrLearner.run(trainingData.rdd());
// Test on a positive example (spam) and a negative one (ham).
// First apply the same HashingTF feature transformation used on the training data.
Vector posTestExample =
tf.transform(Arrays.asList("O M G GET cheap stuff by sending money to ...".split(" ")));
Vector negTestExample =
tf.transform(Arrays.asList("Hi Dad, I started studying Spark the other ...".split(" ")));
// Now use the learned model to predict spam/ham for new emails.
System.out.println("Prediction for positive test example: " + model.predict(posTestExample));
System.out.println("Prediction for negative test example: " + model.predict(negTestExample));
sc.stop();
}
}
数据类型
主要如下
- Vector,向量可以通过mllib.linalg.Vectors类创建出来
- LabeledPoint ,在分类和回归的监督shi算法中,用来表示带标签的数据点,包含一个特征向量与一个标签(由一个浮点数表示),位置在mllib.regression包中
- Rating,用于产品推荐,在mllib.recommendation包中
- 各种model类,一般由一个predict方法可以对新的数据点或数据点组成的RDD应用该模型进行预测。
操作向量
注意:
1.向量由两种,稠密向量和稀疏向量,稠密向量把所有维度的值放在一个浮点数数组中,稀疏向量把各个维度的非0值存储下来,当最多10%元素为非零元素,考虑使用稀疏向量,减少内存,优化速度
2.创建向量的方式在各个语言中由细微差别,在python中,Numpy数组表示一个稠密向量,或者用mllib.linalg.Vectors类,的Vectors.dense([1.0,2.0])创建,数据也可以根据字典或者两个分别代表位置和值的list来传递Vectors.sparse(4,{0:1.0,2:2.0}),Vectors.sparse(4,[0,2],[1.0,2.0]),java,Scala也是用这个类,在java和scala中,MLlib的vector类只是用来为数据表示服务的。
11.5算法
介绍MLlib的主要算法,输入,输出类型
11.5.1特征提取
mllib.feature包中包含一些用来进行常见特征转换的类,这些类中从文本创建特征向量的算法,也有对特征向量进行正规化和伸缩变换的方法
TF-IDF
词频-逆文档频率是一种用来从文本文档生成特征向量的简单方法,TF是每个词在文档中出现的次数,IDF是用来衡量一个词在整个文档corpus出现的逆频繁程度,TF-IDF展示了一个词与特点文档的相关程度
MLlib由两种算法计算TF-IDF:HashingTF,IDF,都在mllib.feature包内,HashingTF从一个文档计算除给定大小的频率向量,为了将词与向量顺序对应出来,使用了哈希法,使用每个单词对所需向量长度S取模得出哈希值,将所有单词映射到0-S-1之间的数字,保证生成一个S维的向量。
1.缩放 构建好特征向量后,可以使用StandardScaler类进行这样的缩放(均值为0,方差为1)
2.正规化 向量正规化为1,使用Normalizer类可以实现,Normalizer.transform(rdd),默认L2范式
3.word2vec,一个基于神经网络的文本特征化算法,Spark在mllib.feature.Word2Vec类引入该算法
11.5.2统计
MLlib通过mllib.stat.Statistics类中的方法提供几种广泛使用的统计函数,这些函数可以直接 在RDD上使用
- Statistics.colStats(rdd) 综合性综述
- Statistics.corr(rdd,method)计算由向量组成的RDD的列间相关矩阵
- Statistics.corr(rdd1,rdd2,method)计算由浮点值组成的RDD的相关矩阵
- Statistics.chiSqTest(rdd)由LabeledPoint对象组成的RDD中每个特征与标签的Pearson独立测试结果,标签和特征值必须是离散值
此外,还有基本统计函数mean(),stevat(),sum()…,RDD还支持sample,sampleByKey()构建简单而分层的数据样本
11.5.3分类和回归
监督学习的两种主要表示形式,都会使用MLlib的LabeledPoint类,预期标签0-C-1,C是类别向量。MLlib包含多种分类回归算法,包括简单的线性算法,决策树和随机森林等
11.5.4聚类
11.5.5 协同过滤与推荐
11.5.6 降维
。。。类似,不赘述
11.5.7模型评估
这些函数在mllib.evalution包中,根据问题不同,在binaryClassficationMetrics和MulticlassMetrics等这些不同的类,使用这些类可以由(预测,true)对组成的RDD创建一个Metrics对象,然后计算多个指标。可以使用map函数将模型应用到测试数据集上,生成组成的RDD
参考:spark快速大数据分析-第11章MLlib库
资料:https://github.com/databricks/learning-spark
更多推荐
所有评论(0)