该软件包目前处于维护状态,但它是唯一提供用流数据训练模型的包

一、基于MLlib的机器学习

MLlib是Spark中提供机器学习函数的库,该库专为集群上并行运行的情况而设计。

MLlib三个核心机器学习功能:
数据准备:特征提取、变换、分类特征的散列和一些自然语言处理方法
机器学习方法:实现了一些流行和高级的回归,分类和聚类算法
实用程序:统计方法,如描述性统计、卡方检验、线性代数、模型评估方法等

MLlib设计理念:
把数据以RDD形式表示,然后在分布式数据集上调用各种算法。归根结底,MLlib就是RDD上一系列可调用的函数的集合。

注意:
MLlib只包含能够在集群上运行良好的并行算法,包括分布式随机森林算法,K-means,交替最小二乘法等,如果用小规模数据集,单节点用scikit_learn,Weka更合适。在spark中,可以通过把参数列表传给parallelize(),来在不同节点上分别运行不同的参数,而在每个节点则使用单节点的机器学习库实现。

java:安装scala,安装spark,
单机启动 spark-shell (centos7)
在这里插入图片描述
python(window10):
下载pyspark包(直接pip下载失败了,所以复制pip下载时显示的地址下载tar.gz包,再解压,执行python setup.py install,还要安装py4j(0.10.9))
成功:在这里插入图片描述

spark文件(https://www.apache.org/dyn/closer.lua/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz,解压安装包,配置环境变量)
启动spark

import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
comm=os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip')
print ('start spark....',comm)
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

成功:
在这里插入图片描述

例子:垃圾邮件分类 ,该例子来自书《spark快速大数据分析》,执行后发现model = LogisticRegressionWithSGD.train(training_data)报错,因为,因为LogisticRegressionWithSGD已经被舍弃了,将LogisticRegressionWithSGD换成LinearRegression(pyspark.ml.regression包里),train换成fit(type object ‘LinearRegression’ has no attribute ‘train’),还是报错,fit() missing 1 required positional argument: ‘dataset’,只有另寻他路

from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.feature import HashingTF


if __name__ == "__main__":
    sc = SparkContext(appName="PythonBookExample")

    # Load 2 types of emails from text files: spam and ham (non-spam).
    # Each line has text from one email.
    spam = sc.textFile("files/spam.txt")
    ham = sc.textFile("files/ham.txt")

    # Create a HashingTF instance to map email text to vectors of 100 features.
    tf = HashingTF(numFeatures = 100)
    # Each email is split into words, and each word is mapped to one feature.
    spamFeatures = spam.map(lambda email: tf.transform(email.split(" ")))
    hamFeatures = ham.map(lambda email: tf.transform(email.split(" ")))

    # Create LabeledPoint datasets for positive (spam) and negative (ham) examples.
    positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1, features))
    negativeExamples = hamFeatures.map(lambda features: LabeledPoint(0, features))
    training_data = positiveExamples.union(negativeExamples)
    training_data.cache() # Cache data since Logistic Regression is an iterative algorithm.

    # Run Logistic Regression using the SGD optimizer.
    # regParam is model regularization, which can make models more robust.
    model = LogisticRegressionWithSGD.train(training_data)

    # Test on a positive example (spam) and a negative one (ham).
    # First apply the same HashingTF feature transformation used on the training data.
    posTestExample = tf.transform("O M G GET cheap stuff by sending money to ...".split(" "))
    negTestExample = tf.transform("Hi Dad, I started studying Spark the other ...".split(" "))

    # Now use the learned model to predict spam/ham for new emails.
    print("Prediction for positive test example: %g" % model.predict(posTestExample))
    print("Prediction for negative test example: %g" % model.predict(negTestExample))
    sc.stop()

就换了个例子,参考https://blog.csdn.net/sinat_36226553/article/details/104083855

'''
内容:pyspark实现线性回归
版本:spark 2.4.4
'''

from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

spark = SparkSession.builder.master('local').appName('LinearRegression').getOrCreate()

sc = spark.sparkContext

data = spark.read.format('csv').option("header", True).load(r"..\..\files\Bike-Sharing-Dataset\day.csv")

# 查看数据模式
data.printSchema()
# 查看主要字段统计信息
data.describe("season","yr").show()

# import pandas as pd
# import seaborn as sns
# import matplotlib.pyplot as plt
# df = pd.read_csv("",header=0)
# sns.set(style="whitegrid",context="notebook")
# cols=["season","yr","temp","atemp","hum","cnt"]
# sns.pairplot(df[cols],size=2.5)
# plt.show()

''' 特征选择并转换类型 '''
data1 = data.select(
    data["season"].cast("Double"),  # 季节 (类别字段)
    data["yr"].cast("Double"),  # 年份 (类别字段)
    data["mnth"].cast("Double"),  # 月份 (类别字段)
    data["holiday"].cast("Double"),  # 是否为假期 (类别字段)
    data["weekday"].cast("Double"),  # 是否为周末 (类别字段)
    data["workingday"].cast("Double"),  # 是否为工作日 (类别字段)
    data["weathersit"].cast("Double"),  # 天气 (类别字段)
    data["temp"].cast("Double"),  # 气温 (数值字段)
    data["atemp"].cast("Double"),  # 体感温度 (数值字段)
    data["hum"].cast("Double"),  # 湿度 (数值字段)
    data["windspeed"].cast("Double"),  # 风速 (数值字段)
    data["cnt"].cast("Double").alias("label"))  # 单车租用量 (数值字段)
# data1.show(10)


''' 对类别字段或特征转换为二元向量 '''
data2 = OneHotEncoder().setInputCol("season").setOutputCol("seasonVec")
data3 = OneHotEncoder().setInputCol("yr").setOutputCol("yrVec")
data4 = OneHotEncoder().setInputCol("mnth").setOutputCol("mnthVec")
data5 = OneHotEncoder().setInputCol("holiday").setOutputCol("holidayVec")
data6 = OneHotEncoder().setInputCol("weekday").setOutputCol("weekdayVec")
data7 = OneHotEncoder().setInputCol("workingday").setOutputCol("workingdayVec")
data8 = OneHotEncoder().setInputCol("weathersit").setOutputCol("weathersitVec")

''' 由于OneHotEncoder不是Estimator,建立一个流水线,把以上转换组装到这个流水线上 '''
pipeline_en = Pipeline().setStages([data2, data3, data4, data5, data6, data7, data8])
data_lr = pipeline_en.fit(data1).transform(data1)

# 把原来的4个及转换后的7个二元特征向量,拼接成一个feature向量。
featureArray = ["seasonVec", "yrVec", "mnthVec", "holidayVec", "weekdayVec", "workingdayVec", "weathersitVec", "temp",
                "atemp", "hum", "windspeed"]
assembler_lr = VectorAssembler().setInputCols(featureArray).setOutputCol("features_lr")

''' 对data2数据集进行随机划分,这份数据用于回归模型 '''
# randomSplit(weigh , *seed)
trainingData_lr, testData_lr = data_lr.randomSplit([0.7, 0.3])

''' 设置线性回归模型的参数 '''
# regParam 决定了目标函数中正则化项的权重
# elasticNetParam():正则化L1和L2的线性组合 0表示只有L2 1表示只有L1
lr = LinearRegression().setFeaturesCol("features_lr").setLabelCol("label").setFitIntercept(True).setMaxIter(
    20).setRegParam(0.3).setElasticNetParam(0.8)

# 把线性回归模型涉及的特征转换、模型训练组装载一个流水上线。
pipeline_lr = Pipeline().setStages([assembler_lr, lr])

# 训练线性回归模型
lrModel = pipeline_lr.fit(trainingData_lr)

# 预测线性回归模型的值
predictions_lr = lrModel.transform(testData_lr)

# RegressionEvaluator.setMetricName定义四种评估器:rmse(默认)、 mse、r2(拟合优度检验)、mae(平均绝对误差)。
evaluator = RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")

# 模型评估指标
rmse_lr = evaluator.evaluate(predictions_lr)

print("rmse_lr:", rmse_lr)

代码能够运行,结果如下:
在这里插入图片描述
说明pyspark现在能正常运行了,yep

下面代码是书上这个例子java代码,(由于环境,未尝试过)

package com.oreilly.learningsparkexamples.java;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
import org.apache.spark.mllib.feature.HashingTF;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.regression.LabeledPoint;

public final class MLlib {

  public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("JavaBookExample");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);

    // Load 2 types of emails from text files: spam and ham (non-spam).
    // Each line has text from one email.
    JavaRDD<String> spam = sc.textFile("files/spam.txt");
    JavaRDD<String> ham = sc.textFile("files/ham.txt");

    // Create a HashingTF instance to map email text to vectors of 100 features.
    final HashingTF tf = new HashingTF(100);

    // Each email is split into words, and each word is mapped to one feature.
    // Create LabeledPoint datasets for positive (spam) and negative (ham) examples.
    JavaRDD<LabeledPoint> positiveExamples = spam.map(new Function<String, LabeledPoint>() {
      @Override public LabeledPoint call(String email) {
        return new LabeledPoint(1, tf.transform(Arrays.asList(email.split(" "))));
      }
    });
    JavaRDD<LabeledPoint> negativeExamples = ham.map(new Function<String, LabeledPoint>() {
      @Override public LabeledPoint call(String email) {
        return new LabeledPoint(0, tf.transform(Arrays.asList(email.split(" "))));
      }
    });
    JavaRDD<LabeledPoint> trainingData = positiveExamples.union(negativeExamples);
    trainingData.cache(); // Cache data since Logistic Regression is an iterative algorithm.

    // Create a Logistic Regression learner which uses the LBFGS optimizer.
    LogisticRegressionWithSGD lrLearner = new LogisticRegressionWithSGD();
    // Run the actual learning algorithm on the training data.
    LogisticRegressionModel model = lrLearner.run(trainingData.rdd());

    // Test on a positive example (spam) and a negative one (ham).
    // First apply the same HashingTF feature transformation used on the training data.
    Vector posTestExample =
        tf.transform(Arrays.asList("O M G GET cheap stuff by sending money to ...".split(" ")));
    Vector negTestExample =
        tf.transform(Arrays.asList("Hi Dad, I started studying Spark the other ...".split(" ")));
    // Now use the learned model to predict spam/ham for new emails.
    System.out.println("Prediction for positive test example: " + model.predict(posTestExample));
    System.out.println("Prediction for negative test example: " + model.predict(negTestExample));

    sc.stop();
  }
}

数据类型

主要如下

  • Vector,向量可以通过mllib.linalg.Vectors类创建出来
  • LabeledPoint ,在分类和回归的监督shi算法中,用来表示带标签的数据点,包含一个特征向量与一个标签(由一个浮点数表示),位置在mllib.regression包中
  • Rating,用于产品推荐,在mllib.recommendation包中
  • 各种model类,一般由一个predict方法可以对新的数据点或数据点组成的RDD应用该模型进行预测。
操作向量

注意:
1.向量由两种,稠密向量和稀疏向量,稠密向量把所有维度的值放在一个浮点数数组中,稀疏向量把各个维度的非0值存储下来,当最多10%元素为非零元素,考虑使用稀疏向量,减少内存,优化速度
2.创建向量的方式在各个语言中由细微差别,在python中,Numpy数组表示一个稠密向量,或者用mllib.linalg.Vectors类,的Vectors.dense([1.0,2.0])创建,数据也可以根据字典或者两个分别代表位置和值的list来传递Vectors.sparse(4,{0:1.0,2:2.0}),Vectors.sparse(4,[0,2],[1.0,2.0]),java,Scala也是用这个类,在java和scala中,MLlib的vector类只是用来为数据表示服务的。

11.5算法

介绍MLlib的主要算法,输入,输出类型

11.5.1特征提取

mllib.feature包中包含一些用来进行常见特征转换的类,这些类中从文本创建特征向量的算法,也有对特征向量进行正规化和伸缩变换的方法
TF-IDF
词频-逆文档频率是一种用来从文本文档生成特征向量的简单方法,TF是每个词在文档中出现的次数,IDF是用来衡量一个词在整个文档corpus出现的逆频繁程度,TF-IDF展示了一个词与特点文档的相关程度

MLlib由两种算法计算TF-IDF:HashingTF,IDF,都在mllib.feature包内,HashingTF从一个文档计算除给定大小的频率向量,为了将词与向量顺序对应出来,使用了哈希法,使用每个单词对所需向量长度S取模得出哈希值,将所有单词映射到0-S-1之间的数字,保证生成一个S维的向量。

1.缩放 构建好特征向量后,可以使用StandardScaler类进行这样的缩放(均值为0,方差为1)
2.正规化 向量正规化为1,使用Normalizer类可以实现,Normalizer.transform(rdd),默认L2范式
3.word2vec,一个基于神经网络的文本特征化算法,Spark在mllib.feature.Word2Vec类引入该算法

11.5.2统计

MLlib通过mllib.stat.Statistics类中的方法提供几种广泛使用的统计函数,这些函数可以直接 在RDD上使用

  • Statistics.colStats(rdd) 综合性综述
  • Statistics.corr(rdd,method)计算由向量组成的RDD的列间相关矩阵
  • Statistics.corr(rdd1,rdd2,method)计算由浮点值组成的RDD的相关矩阵
  • Statistics.chiSqTest(rdd)由LabeledPoint对象组成的RDD中每个特征与标签的Pearson独立测试结果,标签和特征值必须是离散值

此外,还有基本统计函数mean(),stevat(),sum()…,RDD还支持sample,sampleByKey()构建简单而分层的数据样本

11.5.3分类和回归

监督学习的两种主要表示形式,都会使用MLlib的LabeledPoint类,预期标签0-C-1,C是类别向量。MLlib包含多种分类回归算法,包括简单的线性算法,决策树和随机森林等

11.5.4聚类
11.5.5 协同过滤与推荐
11.5.6 降维

。。。类似,不赘述

11.5.7模型评估

这些函数在mllib.evalution包中,根据问题不同,在binaryClassficationMetrics和MulticlassMetrics等这些不同的类,使用这些类可以由(预测,true)对组成的RDD创建一个Metrics对象,然后计算多个指标。可以使用map函数将模型应用到测试数据集上,生成组成的RDD

参考:spark快速大数据分析-第11章MLlib库
资料:https://github.com/databricks/learning-spark

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐