本实验完成利用Spark Streaming来完成对多种数据流的单词统计

 

1. 通过Spark Streaming完成对文件系统流数据的词频统计

1.1 监听Linux本地目录流数据的词频统计

开Linux终端,进入pyspark命令行,逐行输入以下代码

from pyspark.streaming import StreamingContext  #导入SparkStreaming包

ssc = StreamingContext(sc, 10)  #创建DStream, 10s截取一次数据
#其中ssc即为DStream,可以通过type函数查看,例如:type(ssc)
type(ssc)
  
lines = ssc.textFileStream('file:///home/hadoop/listen')   #定义数据源,注意listen是一个目录,需要先创建
 
counts = lines.flatMap(lambda line: line.split(" "))\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda a, b: a+b)
 
counts.pprint()
ssc.start(); ssc.awaitTermination()    #执行程序
 

打开新的终端(这一步很重要)

在新的终端创建一个data1.txt文件

$ nano data1.txt

内容如下:

hello world
hello hadoop
hello spark

 发送流数据到监听目录中

cp data1.txt /home/hadoop/listen

查看pyspark终端的输出,有词频统计结果输出

-------------------------------------------
Time: 2019-05-18 09:11:40
-------------------------------------------
('hadoop', 1)
('hello', 3)
('world', 1)
('spark', 1)

 

1.2 监听HDFS目录的词频统计

参考代码:

from pyspark import SparkContext              #导入SparkContext包
from pyspark.streaming import StreamingContext  #导入SparkStreaming包
sc = SparkContext("local", "appName")            #创建SparkContext对象
ssc = StreamingContext(sc, 10)  #创建DStream, 10s截取一次数据
#其中ssc即为DStream,可以通过type函数查看,例如:type(ssc)
type(ssc)


lines = ssc.textFileStream('hdfs://node1:8020/testStreaming')

counts = lines.flatMap(lambda line: line.split(" "))\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda a, b: a+b)

counts.pprint()
ssc.start(); ssc.awaitTermination()    #执行程序

注意:如果是pyspark命令行下操作,因为pyspark默认已经存在SparkContext的sc对象,所以第1行和第3行不用敲。

运行以上程序后,往hdfs /testStreaming目录上传文件,观察输出,能看到类似于如下输出

-------------------------------------------
Time: 2019-05-06 00:00:50
-------------------------------------------

-------------------------------------------
Time: 2019-05-06 00:01:00
-------------------------------------------

-------------------------------------------
Time: 2019-05-06 00:01:10
-------------------------------------------
('c', 1)
('d', 1)
('diff', 1)
('hadoop', 1)
('b', 1)
('Hello', 3)
('world', 1)
('a12', 1)
('pig', 1)
('a11', 1)

 升级玩法:通过运行参数指定监听目录

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingHDFSWordCount")

 

2. Socket套接字流数据的词频统计

代码提示:

与读取Linux本地文件系统不同的是将

lines = ssc.textFileStream('file:///home/hadoop/listen')

一行换成如下语句:

lines = ssc.socketTextStream('localhost',9999)

注意:执行start()后,会出现如下报错,因为没有开启socket流数据

19/05/18 09:46:16 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused (Connection refused)

 

打开新的终端,输入如下命令启动NetCat并持续监听9999端口

nc -lk 9999

在命令下方输入一些数据,例如

hello world
hello hadoop

 

查看pyspark终端输出

WARN BlockManager: Block input-0-1558144212800 replicated to only 0 peer(s) instead of 1 peers
 

思考:如何监听flume流数据?

Logo

更多推荐