前言

前一章中我们介绍了Spark的Standalone模式的安装. 本章我们介绍下Spark Shell操作窗口的基本的安装.

  • 基本启动与使用
  • 基本算子使用

基本启动与使用

  • 本地启动
    进入./bin目录, 使用spark-shell即可启动. 未链接集群, 直接启动了一个Worker结点. 可以通过 http://localhost:4040 进行访问.
localhost:bin Sean$ spark-shell
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/03/29 17:15:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/03/29 17:15:01 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.31.80 instead (on interface en0)
19/03/29 17:15:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context Web UI available at http://192.168.31.80:4040
Spark context available as 'sc' (master = local[*], app id = local-1553850902335).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

  • 本地启动 - 链接集群 / 指定配置 spark-shell --master spark://localhost:7077 --total-executor-cores 1 --executor-memory 1g
localhost:bin Sean$ spark-shell --master spark://localhost:7077 --total-executor-cores 1 --executor-memory 1g
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/03/30 15:25:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://127.0.0.1:4040
Spark context available as 'sc' (master = spark://localhost:7077, app id = app-20190330152508-0001).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@66d2885c

scala>

启动成功后, 我们可以在http://localhost:8080上面看到spark-shell进程.
在这里插入图片描述
随后,我们可以使用spark-shell内使用Scala语言完成一定的操作.


Spark submit

当我们在生产部署与发布的时候通常是使用spark-submit脚本进行提交的.(./bin目录下.) 我们通常是使用Maven将程序进行打包, 随后通过spark-submit提交进行.
(注: Maven打全码包这边就不再叙述了, 更多请看Maven 打包实战.)


Q & A

localhost:bin Sean$ spark-shell --master  spark://192.168.31.80:7077
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/03/29 18:11:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/03/29 18:11:38 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.31.80 instead (on interface en0)
19/03/29 18:11:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/03/29 18:11:39 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077
org.apache.spark.SparkException: Exception thrown in awaitResult:
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
	at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
	... 4 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	... 1 more
19/03/29 18:11:59 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077
org.apache.spark.SparkException: Exception thrown in awaitResult:
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
	at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
	... 4 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	... 1 more
19/03/29 18:12:19 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077
org.apache.spark.SparkException: Exception thrown in awaitResult:
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
	at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
	... 4 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	... 1 more
19/03/29 18:12:39 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
19/03/29 18:12:39 WARN StandaloneSchedulerBackend: Application ID is not initialized yet.
19/03/29 18:12:39 WARN StandaloneAppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master
19/03/29 18:12:40 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)
	at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)
	at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)
	at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)
	at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)
	at $line3.$read$$iw$$iw.<init>(<console>:15)
	at $line3.$read$$iw.<init>(<console>:42)
	at $line3.$read.<init>(<console>:44)
	at $line3.$read$.<init>(<console>:48)
	at $line3.$read$.<clinit>(<console>)
	at $line3.$eval$.$print$lzycompute(<console>:7)
	at $line3.$eval$.$print(<console>:6)
	at $line3.$eval.$print(<console>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
	at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
	at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
	at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
	at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
	at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38)
	at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
	at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
	at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
	at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)
	at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:98)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
	at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
	at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
	at org.apache.spark.repl.Main$.doMain(Main.scala:74)
	at org.apache.spark.repl.Main$.main(Main.scala:54)
	at org.apache.spark.repl.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
  at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)
  at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)
  at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)
  at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)
  at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)
  ... 47 elided
<console>:14: error: not found: value spark
       import spark.implicits._
              ^
<console>:14: error: not found: value spark
       import spark.sql
              ^
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

解决措施: 这种问题的主要形成问题主要有如下几种:

  • 检查防火墙信息, 查看到Spark的地址是否打开;
  • 检查本地的Scala版本是否与远程Spark提交的Scala版本一致.(conf/spark-env.sh文件//etc/profile/spark-shell直接启动时显示的版本号)
  • 本地的/etc/hosts的配置错误.
  • 本地启动伪集群时, 需要将spark-env.sh文件与slaves进行如下配置.
# spark-env.sh
export SPARK_MASTER_IP=127.0.0.1
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_MASTER_PORT=7077
# slaves
localhost

[1]. 关于Spark报错不能连接到Server的解决办法(Failed to connect to master master_hostname:7077)
[2]. Unable to connect to Spark master
[3]. Spark报错——AnnotatedConnectException拒绝连接
[4]. 单机spark绑定端口

localhost:bin Sean$ spark-shell --master spark://127.0.0.1:7077
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/03/30 15:23:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/03/30 15:23:55 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED
19/03/30 15:23:55 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: FAILED
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:509)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:146)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:254)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:168)
	at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
	at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:216)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
19/03/30 15:23:55 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)
	at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)
	at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)
	at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)
	at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)
	at $line3.$read$$iw$$iw.<init>(<console>:15)
	at $line3.$read$$iw.<init>(<console>:42)
	at $line3.$read.<init>(<console>:44)
	at $line3.$read$.<init>(<console>:48)
	at $line3.$read$.<clinit>(<console>)
	at $line3.$eval$.$print$lzycompute(<console>:7)
	at $line3.$eval$.$print(<console>:6)
	at $line3.$eval.$print(<console>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
	at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
	at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
	at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
	at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
	at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38)
	at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
	at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
	at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
	at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)
	at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:98)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
	at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
	at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
	at org.apache.spark.repl.Main$.doMain(Main.scala:74)
	at org.apache.spark.repl.Main$.main(Main.scala:54)
	at org.apache.spark.repl.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
  at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)
  at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)
  at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)
  at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)
  at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)
  ... 47 elided
<console>:14: error: not found: value spark
       import spark.implicits._
              ^
<console>:14: error: not found: value spark
       import spark.sql
              ^
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

scala> localhost:bin Sean$
  • 本地硬盘 & 内存 & CPU核数资源不够时都可能造成上述问题.
    spark-shell任务完成, 但是资源不够失败了.()我们可以通过-- total-executors 1减少所占用的CPU核数与内存数目.
    在这里插入图片描述

基本算子使用

由于本地的机器限制, 我们这边就直接使用spark-shell进行下面的算子操作.

前置条件
  • Spark
  • Hadoop
基本操作
  • 进入bin/spark-shell
localhost:bin Sean$ ./spark-shell
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/05/25 16:26:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://127.0.0.1:4040
Spark context available as 'sc' (master = local[*], app id = local-1558772763953).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
  • 将文件放到Hadoop上
# 传输到本地服务器
hadoop fs -put / hello2019.sh /
# 传输到远程服务器
hadoop fs -put hello2019.sh hdfs://localhost:9000/
  • 从Hadoop上读取数据
# sc 即sparkContext
scala> sc
res4: org.apache.spark.SparkContext = org.apache.spark.SparkContext@25198ead

# 计算WordCount
scala>  sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
res0: Array[(String, Int)] = Array((t,8), (l,3), (a,3), (i,3), (y,3), (p,2), (e,2), (c,2), (0,1), (b,1), (h,1), (2,1), (" ",1), (k,1), (o,1), (9,1), (1,1))

# 保存到HDFS上 多个文件
scala>  sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://localhost:9000/wordcount/20190525/out")

# 保存到HDFS上 一个文件 (reduceByKey(_+_,1))
scala>  sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_,1).sortBy(_._2,false).saveAsTextFile("hdfs://localhost:9000/wordcount/20190525-2/out")
  • 输出结果(单文件)
    在这里插入图片描述
localhost:~ Sean$ hadoop fs -cat /wordcount/20190525/out/*
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
19/05/25 16:29:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(t,8)
(l,3)
(a,3)
(i,3)
(y,3)
(p,2)
(e,2)
(c,2)
(0,1)
(b,1)
(h,1)
(2,1)
( ,1)
(k,1)
(o,1)
(9,1)
(1,1)
  • 输出结果(多文件)
    在这里插入图片描述
localhost:~ Sean$ hadoop fs -cat /wordcount/20190525-2/out/*
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
19/05/25 16:30:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(t,8)
(a,3)
(i,3)
(y,3)
(l,3)
(e,2)
(p,2)
(c,2)
(0,1)
(k,1)
(b,1)
(h,1)
(2,1)
( ,1)
(o,1)
(9,1)
(1,1)

WordCount细节

  • textFile()
    从某个地方读取数据, 并转换为RDD返回.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").collect
res10: Array[String] = Array(hello 2019, cat, pitty, kitty, able, pitty, cat)
  • map()
    遍历所有的元素.

  • split()
    拆分.

  • collect
    搜集到主结点.

scala> sc.textFile("hdfs://localhost:9000/wordcount/input").map(_.split(" ")).collect
res12: Array[Array[String]] = Array(Array(hello, 2019), Array(cat), Array(pitty), Array(kitty), Array(able), Array(pitty), Array(cat))
  • flatMap() & map().flatten
    map().flattenScala内到写法, Spark内貌似没有这样到写法.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).collect
res17: Array[String] = Array(hello, 2019, cat, pitty, kitty, able, pitty, cat)
  • map((_,1))
    添加计数操作.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).map((_,1)).collect
res19: Array[(String, Int)] = Array((hello,1), (2019,1), (cat,1), (pitty,1), (kitty,1), (able,1), (pitty,1), (cat,1))
  • reduceByKey()
    根据key值进行划分. reduceByKeyRDD独有的算子, Scala内不存在.
    后面的((F),1)1是什么含义, 指定分区数目.(上面的输出例子, 将结果写入1个文件还是3个文件.)
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).map((_,1)).reduceByKey((x,y) => (x+y)).collect
res20: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (cat,2), (kitty,1))

scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res21: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (cat,2), (kitty,1))

scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_,1).collect
res22: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (kitty,1), (cat,2))
scala>  sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
res0: Array[(String, Int)] = Array((t,8), (l,3), (a,3), (i,3), (y,3), (p,2), (e,2), (c,2), (0,1), (b,1), (h,1), (2,1), (" ",1), (k,1), (o,1), (9,1), (1,1))
  • saveAsTextFile()
    将处理后的结果存储出来.

scala>  sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://localhost:9000/wordcount/20190525/out")

Reference

[1]. 深入理解groupByKey、reduceByKey区别——本质就是一个local machine的reduce操作
[2]. reduceByKey应用举例

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐