上一篇文章讲到了docker玩转Hadoop 

 这里想使用spark MLlib进行集群的机器学习

spark的准备

1.拉取镜像

docker pull singularities/spark

2.新建docker-compose.yml文件

version: "2"

services:
  master:
    image: singularities/spark
    command: start-spark master
    hostname: master
    ports:
      - "6066:6066"
      - "7070:7070"
      - "8080:8080"
      - "50070:50070"
  worker:
    image: singularities/spark
    command: start-spark worker master
    environment:
      SPARK_WORKER_CORES: 1
      SPARK_WORKER_MEMORY: 2g
    links:
      - master

3.启动spark集群

docker-compose up -d

4.看一下启动是否成功

[root@localhost singularitiesCR]# docker ps
CONTAINER ID        IMAGE                 COMMAND                  CREATED             STATUS              PORTS                                                                                                                                                                                                                                                    NAMES
228ac8bae04b        singularities/spark   "start-spark worker …"   2 hours ago         Up 2 hours          6066/tcp, 7077/tcp, 8020/tcp, 8080-8081/tcp, 9000/tcp, 10020/tcp, 13562/tcp, 14000/tcp, 19888/tcp, 50010/tcp, 50020/tcp, 50070/tcp, 50075/tcp, 50090/tcp, 50470/tcp, 50475/tcp                                                                           singularitiescr_worker_1
551c2400dc08        singularities/spark   "start-spark master"     2 hours ago         Up 2 hours          0.0.0.0:6066->6066/tcp, 7077/tcp, 0.0.0.0:7070->7070/tcp, 8020/tcp, 8081/tcp, 9000/tcp, 10020/tcp, 13562/tcp, 14000/tcp, 19888/tcp, 50010/tcp, 0.0.0.0:8080->8080/tcp, 50020/tcp, 50075/tcp, 50090/tcp, 50470/tcp, 0.0.0.0:50070->50070/tcp, 50475/tcp   singularitiescr_master_1

玩转机器学习 

1.随便进入一台docker容器

docker exec -it 228ac8bae04b /bin/bash

2.上传本地数据集文件到hdfs

hadoop  fs -put /usr/local/spark-2.2.1/data/mllib/sample_libsvm_data.txt /sample_libsvm_data.txt

3.使用spark mlib进行回归(这个例子是直接用的spark官网的案例)

首先启动spark命令行

root@master:/# spark-shell

 接下来就可以计算了

scala> import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.Pipeline

scala> import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.evaluation.RegressionEvaluator

scala> import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.feature.VectorIndexer

scala> import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}

scala> val data = spark.read.format("libsvm").load("/sample_libsvm_data.txt")
data: org.apache.spark.sql.DataFrame = [label: double, features: vector]

scala> data.show()
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows


scala> val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(data)
featureIndexer: org.apache.spark.ml.feature.VectorIndexerModel = vecIdx_a630fa776506

scala> val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: double, features: vector]
testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: double, features: vector]

scala> trainingData.show()
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[95,96,97,12...|
|  0.0|(692,[98,99,100,1...|
|  0.0|(692,[122,123,124...|
|  0.0|(692,[122,123,148...|
|  0.0|(692,[123,124,125...|
|  0.0|(692,[123,124,125...|
|  0.0|(692,[123,124,125...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[125,126,127...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[127,128,129...|
+-----+--------------------+
only showing top 20 rows


scala> val rf = new RandomForestRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures")
rf: org.apache.spark.ml.regression.RandomForestRegressor = rfr_290e7a917472

scala> val pipeline = new Pipeline().setStages(Array(featureIndexer, rf))
pipeline: org.apache.spark.ml.Pipeline = pipeline_6a9f80eb2a1d

scala> val model = pipeline.fit(trainingData)
model: org.apache.spark.ml.PipelineModel = pipeline_6a9f80eb2a1d

scala> val predictions = model.transform(testData)
predictions: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 2 more fields]

scala> predictions.select("prediction", "label", "features").show(5)
+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|      0.25|  0.0|(692,[100,101,102...|
|       0.0|  0.0|(692,[121,122,123...|
|       0.1|  0.0|(692,[126,127,128...|
|      0.05|  0.0|(692,[126,127,128...|
|       0.0|  0.0|(692,[126,127,128...|
+----------+-----+--------------------+
only showing top 5 rows


scala> val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_bb978e934812

scala> val rmse = evaluator.evaluate(predictions)
rmse: Double = 0.12857782882175348

scala> println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
Root Mean Squared Error (RMSE) on test data = 0.12857782882175348

至此,使用spark的机器学习就做完了

注意:如果spark在读取数据的时候,连接出现错误,记得每台机器配置hosts,和上一篇Hadoop 那个一样,不然识别不出来

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐