Spark Shell 的使用
前言前一章中我们介绍了Spark的Standalone模式的安装. 本章我们介绍下Spark Shell操作窗口的基本的安装.基本启动与使用基本启动与使用本地启动进入./bin目录, 使用spark-shell即可启动.localhost:bin Sean$ spark-shellPicked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UT...
前言
前一章中我们介绍了Spark的Standalone
模式的安装. 本章我们介绍下Spark Shell操作窗口的基本的安装.
- 基本启动与使用
- 基本算子使用
基本启动与使用
- 本地启动
进入./bin
目录, 使用spark-shell
即可启动. 未链接集群, 直接启动了一个Worker结点
. 可以通过 http://localhost:4040 进行访问.
localhost:bin Sean$ spark-shell
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/03/29 17:15:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/03/29 17:15:01 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.31.80 instead (on interface en0)
19/03/29 17:15:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context Web UI available at http://192.168.31.80:4040
Spark context available as 'sc' (master = local[*], app id = local-1553850902335).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
- 本地启动 - 链接集群 / 指定配置
spark-shell --master spark://localhost:7077 --total-executor-cores 1 --executor-memory 1g
localhost:bin Sean$ spark-shell --master spark://localhost:7077 --total-executor-cores 1 --executor-memory 1g
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/03/30 15:25:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://127.0.0.1:4040
Spark context available as 'sc' (master = spark://localhost:7077, app id = app-20190330152508-0001).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@66d2885c
scala>
启动成功后, 我们可以在http://localhost:8080上面看到spark-shell进程.
随后,我们可以使用spark-shell
内使用Scala语言
完成一定的操作.
Spark submit
当我们在生产部署与发布的时候通常是使用spark-submit
脚本进行提交的.(./bin
目录下.) 我们通常是使用Maven
将程序进行打包, 随后通过spark-submit
提交进行.
(注: Maven打全码包这边就不再叙述了, 更多请看Maven 打包实战.)
Q & A
localhost:bin Sean$ spark-shell --master spark://192.168.31.80:7077
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/03/29 18:11:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/03/29 18:11:38 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.31.80 instead (on interface en0)
19/03/29 18:11:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/03/29 18:11:39 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
... 4 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
19/03/29 18:11:59 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
... 4 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
19/03/29 18:12:19 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
... 4 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
19/03/29 18:12:39 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
19/03/29 18:12:39 WARN StandaloneSchedulerBackend: Application ID is not initialized yet.
19/03/29 18:12:39 WARN StandaloneAppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master
19/03/29 18:12:40 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)
at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)
at $line3.$read$$iw$$iw.<init>(<console>:15)
at $line3.$read$$iw.<init>(<console>:42)
at $line3.$read.<init>(<console>:44)
at $line3.$read$.<init>(<console>:48)
at $line3.$read$.<clinit>(<console>)
at $line3.$eval$.$print$lzycompute(<console>:7)
at $line3.$eval$.$print(<console>:6)
at $line3.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38)
at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)
at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:98)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
at org.apache.spark.repl.Main$.doMain(Main.scala:74)
at org.apache.spark.repl.Main$.main(Main.scala:54)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)
at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)
... 47 elided
<console>:14: error: not found: value spark
import spark.implicits._
^
<console>:14: error: not found: value spark
import spark.sql
^
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
解决措施: 这种问题的主要形成问题主要有如下几种:
- 检查防火墙信息, 查看到Spark的地址是否打开;
- 检查本地的Scala版本是否与远程Spark提交的Scala版本一致.(
conf/spark-env.sh文件
//etc/profile
/spark-shell直接启动时显示的版本号
)- 本地的
/etc/hosts
的配置错误.- 本地启动伪集群时, 需要将
spark-env.sh
文件与slaves
进行如下配置.# spark-env.sh export SPARK_MASTER_IP=127.0.0.1 export SPARK_LOCAL_IP=127.0.0.1 export SPARK_MASTER_PORT=7077
# slaves localhost
[1]. 关于Spark报错不能连接到Server的解决办法(Failed to connect to master master_hostname:7077)
[2]. Unable to connect to Spark master
[3]. Spark报错——AnnotatedConnectException拒绝连接
[4]. 单机spark绑定端口
localhost:bin Sean$ spark-shell --master spark://127.0.0.1:7077
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/03/30 15:23:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/03/30 15:23:55 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED
19/03/30 15:23:55 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: FAILED
at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:509)
at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:146)
at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:254)
at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:168)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:216)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
19/03/30 15:23:55 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)
at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)
at $line3.$read$$iw$$iw.<init>(<console>:15)
at $line3.$read$$iw.<init>(<console>:42)
at $line3.$read.<init>(<console>:44)
at $line3.$read$.<init>(<console>:48)
at $line3.$read$.<clinit>(<console>)
at $line3.$eval$.$print$lzycompute(<console>:7)
at $line3.$eval$.$print(<console>:6)
at $line3.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38)
at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)
at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:98)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
at org.apache.spark.repl.Main$.doMain(Main.scala:74)
at org.apache.spark.repl.Main$.main(Main.scala:54)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)
at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)
... 47 elided
<console>:14: error: not found: value spark
import spark.implicits._
^
<console>:14: error: not found: value spark
import spark.sql
^
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
scala> localhost:bin Sean$
- 本地硬盘 & 内存 & CPU核数资源不够时都可能造成上述问题.
spark-shell任务完成, 但是资源不够失败了.()我们可以通过-- total-executors 1
减少所占用的CPU核数与内存数目.
基本算子使用
由于本地的机器限制, 我们这边就直接使用spark-shell
进行下面的算子操作.
前置条件
- Spark
- Hadoop
基本操作
- 进入
bin/spark-shell
localhost:bin Sean$ ./spark-shell
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/05/25 16:26:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://127.0.0.1:4040
Spark context available as 'sc' (master = local[*], app id = local-1558772763953).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
- 将文件放到Hadoop上
# 传输到本地服务器
hadoop fs -put / hello2019.sh /
# 传输到远程服务器
hadoop fs -put hello2019.sh hdfs://localhost:9000/
- 从Hadoop上读取数据
# sc 即sparkContext
scala> sc
res4: org.apache.spark.SparkContext = org.apache.spark.SparkContext@25198ead
# 计算WordCount
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
res0: Array[(String, Int)] = Array((t,8), (l,3), (a,3), (i,3), (y,3), (p,2), (e,2), (c,2), (0,1), (b,1), (h,1), (2,1), (" ",1), (k,1), (o,1), (9,1), (1,1))
# 保存到HDFS上 多个文件
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://localhost:9000/wordcount/20190525/out")
# 保存到HDFS上 一个文件 (reduceByKey(_+_,1))
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_,1).sortBy(_._2,false).saveAsTextFile("hdfs://localhost:9000/wordcount/20190525-2/out")
- 输出结果(单文件)
localhost:~ Sean$ hadoop fs -cat /wordcount/20190525/out/*
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
19/05/25 16:29:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(t,8)
(l,3)
(a,3)
(i,3)
(y,3)
(p,2)
(e,2)
(c,2)
(0,1)
(b,1)
(h,1)
(2,1)
( ,1)
(k,1)
(o,1)
(9,1)
(1,1)
- 输出结果(多文件)
localhost:~ Sean$ hadoop fs -cat /wordcount/20190525-2/out/*
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
19/05/25 16:30:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(t,8)
(a,3)
(i,3)
(y,3)
(l,3)
(e,2)
(p,2)
(c,2)
(0,1)
(k,1)
(b,1)
(h,1)
(2,1)
( ,1)
(o,1)
(9,1)
(1,1)
WordCount细节
textFile()
从某个地方读取数据, 并转换为RDD返回.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").collect
res10: Array[String] = Array(hello 2019, cat, pitty, kitty, able, pitty, cat)
-
map()
遍历所有的元素. -
split()
拆分. -
collect
搜集到主结点.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").map(_.split(" ")).collect
res12: Array[Array[String]] = Array(Array(hello, 2019), Array(cat), Array(pitty), Array(kitty), Array(able), Array(pitty), Array(cat))
flatMap()
&map().flatten
map().flatten
是Scala
内到写法,Spark
内貌似没有这样到写法.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).collect
res17: Array[String] = Array(hello, 2019, cat, pitty, kitty, able, pitty, cat)
- map((_,1))
添加计数操作.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).map((_,1)).collect
res19: Array[(String, Int)] = Array((hello,1), (2019,1), (cat,1), (pitty,1), (kitty,1), (able,1), (pitty,1), (cat,1))
reduceByKey()
根据key
值进行划分.reduceByKey
是RDD
独有的算子,Scala
内不存在.
后面的((F),1)
的1
是什么含义, 指定分区数目.(上面的输出例子, 将结果写入1个文件还是3个文件.)
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).map((_,1)).reduceByKey((x,y) => (x+y)).collect
res20: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (cat,2), (kitty,1))
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res21: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (cat,2), (kitty,1))
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_,1).collect
res22: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (kitty,1), (cat,2))
-
reduceByKey
与groupByKey
的区别?
reduceByKey
先在各个分片进行计算, 最后进行汇总计算.groupByKey
直接进行汇总计算.
深入理解groupByKey、reduceByKey区别——本质就是一个local machine的reduce操作
reduceByKey应用举例 -
sortBy(_._2,false)
Spark
上的sortBy(X,false)
, 后一个参数可以表示是生序还是降序的.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
res0: Array[(String, Int)] = Array((t,8), (l,3), (a,3), (i,3), (y,3), (p,2), (e,2), (c,2), (0,1), (b,1), (h,1), (2,1), (" ",1), (k,1), (o,1), (9,1), (1,1))
saveAsTextFile()
将处理后的结果存储出来.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://localhost:9000/wordcount/20190525/out")
Reference
[1]. 深入理解groupByKey、reduceByKey区别——本质就是一个local machine的reduce操作
[2]. reduceByKey应用举例
更多推荐
所有评论(0)