概述

Spark Streaming是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。数据可以从像卡夫卡,室壁运动,或TCP套接字许多来源摄入,并且可以使用与像高级别功能表达复杂的算法来处理map,reduce,join和window。最后,可以将处理后的数据推送到文件系统,数据库和实时仪表板。实际上,您可以在数据流上应用Spark的 机器学习和 图形处理算法。
在内部,它的工作方式如下。Spark Streaming接收实时输入数据流,并将数据分成批次,然后由Spark引擎处理,以成批生成最终结果流。
Spark Streaming提供了称为离散化流或DStream的高级抽象,它表示连续的数据流。可以根据来自Kafka和Kinesis等来源的输入数据流来创建DStream,也可以通过对其他DStream应用高级操作来创建DStream。在内部,DStream表示为RDD序列 。

Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。

和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。
DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创建出来的DStream 支持两种操作,一种是转化操作(transformation),会生成一个新的DStream,另一种是输出操作(output operation),可以把数据写入外部系统中。DStream 提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口。

架构

在这里插入图片描述在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

案例

WordCount案例实操
1.需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数
2.添加依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
3.编写代码
package com.atguigu

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf

object StreamWordCount {

  def main(args: Array[String]): Unit = {

    //1.初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    //2.初始化SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    //3.通过监控端口创建DStream,读进来的数据为一行行
    val lineStreams = ssc.socketTextStream("hadoop102", 9999)

    //将每一行数据做切分,形成一个个单词
    val wordStreams = lineStreams.flatMap(_.split(" "))

    //将单词映射成元组(word,1)
    val wordAndOneStreams = wordStreams.map((_, 1))

    //将相同的单词次数做统计
    val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)

    //打印
    wordAndCountStreams.print()

    //启动SparkStreamingContext
    ssc.start()
    ssc.awaitTermination()
  }
}


4.启动程序并通过NetCat发送数据:
[atguigu@hadoop102 spark]$ nc -lk 9999
hello atguigu
注意:如果程序运行时,log日志太多,可以将spark conf目录下的log4j文件里面的日志级别改成WARN。


Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:

对数据的操作也是按照RDD为单位来进行的

计算过程由Spark engine来完成

Dstream创建

Spark Streaming原生支持一些不同的数据源。一些“核心”数据源已经被打包到Spark Streaming 的 Maven 工件中,而其他的一些则可以通过 spark-streaming-kafka 等附加工件获取。每个接收器都以 Spark 执行器程序中一个长期运行的任务的形式运行,因此会占据分配给应用的 CPU 核心。此外,我们还需要有可用的 CPU 核心来处理数据。这意味着如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需要的核心数。例如,如果我们想要在流计算应用中运行 10 个接收器,那么至少需要为应用分配 11 个 CPU 核心。所以如果在本地模式运行,不要使用local或者local[1]。

3.1文件数据源

3.1.1 用法及说明
文件数据流:能够读取所有HDFS API兼容的文件系统文件,通过fileStream方法进行读取,Spark Streaming 将会监控 dataDirectory 目录并不断处理移动进来的文件,记住目前不支持嵌套目录。
streamingContext.textFileStream(dataDirectory)
注意事项:
1)文件需要有相同的数据格式;
2)文件进入 dataDirectory的方式需要通过移动或者重命名来实现;
3)一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据;
3.1.2 案例实操
(1)在HDFS上建好目录
[atguigu@hadoop102 spark]$ hadoop fs -mkdir /fileStream
(2)在/opt/module/data创建三个文件
[atguigu@hadoop102 data]$ touch a.tsv
[atguigu@hadoop102 data]$ touch b.tsv
[atguigu@hadoop102 data]$ touch c.tsv

添加如下数据:
Hello atguigu
Hello spark
(3)编写代码

package com.atguigu

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

object FileStream {

  def main(args: Array[String]): Unit = {

    //1.初始化Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]")
.setAppName("StreamWordCount")

    //2.初始化SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(5))

	  //3.监控文件夹创建DStream
    val dirStream = ssc.textFileStream("hdfs://hadoop102:9000/fileStream")

    //4.将每一行数据做切分,形成一个个单词
    val wordStreams = dirStream.flatMap(_.split("\t"))

    //5.将单词映射成元组(word,1)
    val wordAndOneStreams = wordStreams.map((_, 1))

    //6.将相同的单词次数做统计
    val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)

    //7.打印
    wordAndCountStreams.print()

    //8.启动SparkStreamingContext
    ssc.start()
    ssc.awaitTermination()
  }
}4)启动程序并向fileStream目录上传文件
[atguigu@hadoop102 data]$ hadoop fs -put ./a.tsv /fileStream
[atguigu@hadoop102 data]$ hadoop fs -put ./b.tsv /fileStream
[atguigu@hadoop102 data]$ hadoop fs -put ./c.tsv /fileStream
(5)获取计算结果
-------------------------------------------
Time: 1539073810000 ms
-------------------------------------------

-------------------------------------------
Time: 1539073815000 ms
-------------------------------------------
(Hello,4)
(spark,2)
(atguigu,2)

-------------------------------------------
Time: 1539073820000 ms
-------------------------------------------
(Hello,2)
(spark,1)
(atguigu,1)

-------------------------------------------
Time: 1539073825000 ms
-------------------------------------------


3.2 RDD队列

3.2.1 用法及说明
测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。
3.2.2 案例实操
1)需求:循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount
2)编写代码

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object RDDStream {

  def main(args: Array[String]) {

    //1.初始化Spark配置信息
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")

    //2.初始化SparkStreamingContext
    val ssc = new StreamingContext(conf, Seconds(4))

    //3.创建RDD队列
    val rddQueue = new mutable.Queue[RDD[Int]]()

    //4.创建QueueInputDStream
    val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)

    //5.处理队列中的RDD数据
    val mappedStream = inputStream.map((_,1))
    val reducedStream = mappedStream.reduceByKey(_ + _)

    //6.打印结果
    reducedStream.print()

    //7.启动任务
    ssc.start()

//8.循环创建并向RDD队列中放入RDD
    for (i <- 1 to 5) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
      Thread.sleep(2000)
    }

    ssc.awaitTermination()

  }
}

自定义数据源

3.3.1 用法及说明
需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。
3.3.2 案例实操
1)需求:自定义数据源,实现监控某个端口号,获取该端口号内容。
2)自定义数据源

package com.atguigu

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {

  //最初启动的时候,调用该方法,作用为:读数据并将数据发送给Spark
  override def onStart(): Unit = {
    new Thread("Socket Receiver") {
      override def run() {
        receive()
      }
    }.start()
  }

  //读数据并将数据发送给Spark
  def receive(): Unit = {

    //创建一个Socket
    var socket: Socket = new Socket(host, port)

    //定义一个变量,用来接收端口传过来的数据
    var input: String = null

    //创建一个BufferedReader用于读取端口传来的数据
    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))

    //读取数据
    input = reader.readLine()

    //当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark
    while (!isStopped() && input != null) {
      store(input)
      input = reader.readLine()
    }

    //跳出循环则关闭资源
    reader.close()
    socket.close()

    //重启任务
    restart("restart")
  }

  override def onStop(): Unit = {}
}
3)使用自定义的数据源采集数据
package com.atguigu

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

object FileStream {

  def main(args: Array[String]): Unit = {

    //1.初始化Spark配置信息
Val sparkConf = new SparkConf().setMaster("local[*]")
.setAppName("StreamWordCount")

    //2.初始化SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(5))

//3.创建自定义receiver的Streaming
val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop102", 9999))

    //4.将每一行数据做切分,形成一个个单词
    val wordStream = lineStream.flatMap(_.split("\t"))

    //5.将单词映射成元组(word,1)
    val wordAndOneStream = wordStream.map((_, 1))

    //6.将相同的单词次数做统计
    val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)

    //7.打印
    wordAndCountStream.print()

    //8.启动SparkStreamingContext
    ssc.start()
    ssc.awaitTermination()
  }
}


Kafka数据源

Receiver

在这里插入图片描述数据是源源不断的通过 receiver 接收,当数据被接收后,其将这些数据存储在 Block Manager 中;为了不丢失数据,其还将数据备份到其他的 Block Manager 中;
Receiver Tracker 收到被存储的 Block IDs,然后其内部会维护一个时间到这些 block IDs 的关系;
Job Generator 会每隔 batchInterval 的时间收到一个事件,其会根据这段时间到来的数据和stage生成一个 JobSet;
Job Scheduler 运行上面生成的 JobSet,将JobSet分发到对应的executor上运行。
存在的问题:当batch processing time>batchinterval 这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题;
Receiver方式生成的微批RDD即BlockRDD,分区数就是block数

Direct

在这里插入图片描述
与receiver模式类似,不同在于没有单独receiver组件,Driver周期性查询Kafka的top+partition最新的offset,然后根据自身消费情况定义每个batch的offset范围,启动Job后各个executor根据batch的offset范围直接从kafka中拉取数据,这样避免了SparkStreaming与数据源生产速率不均衡造成的数据积压。
同时可以自己维护kafka的offset,避免数据丢失
Direct方式生成的微批RDD即kafkaRDD,分区数和kafka分区数一一对应

3.4.1 用法及说明
在工程中需要引入Maven工件spark- streaming-kafka_2.11来使用它。包内提供的 KafkaUtils对象可以在StreamingContext和JavaStreamingContext中以你的 Kafka 消息创建出 DStream。由于KafkaUtils可以订阅多个主题,因此它创建出的 DStream 由成对的主题和消息组成。要创建出一个流数据,需要使用 StreamingContext 实例、一个由逗号隔开的 ZooKeeper 主机列表字符串、消费者组的名字(唯一名字),以及一个从主题到针对这个主题的接收器线程数的映射表来调用createStream()方法。
3.4.2 案例实操
1)需求1:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算(WordCount),最终打印到控制台。

(1)导入依赖

org.apache.spark spark-streaming-kafka_2.11 1.6.3
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaSparkStreaming {

  def main(args: Array[String]): Unit = {

    //1.创建SparkConf并初始化SSC
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    //2.定义kafka参数
    val zookeeper = "hadoop102:2181,hadoop103:2181,hadoop104:2181"
    val topic = "source"
    val consumerGroup = "spark"

    //3.将kafka参数映射为map
    val kafkaParam: Map[String, String] = Map[String, String](
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
       ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.GROUP_ID_CONFIG -> consumerGroup,
      "zookeeper.connect" -> zookeeper
    )

    //4.通过KafkaUtil创建kafkaDSteam
    val kafkaDSteam: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      ssc,
      kafkaParam,
      Map[String, Int](topic -> 1),
      StorageLevel.MEMORY_ONLY
    )

    //5.对kafkaDSteam做计算(WordCount)
    kafkaDSteam.foreachRDD {
      rdd => {
        val word: RDD[String] = rdd.flatMap(_._2.split(" "))
        val wordAndOne: RDD[(String, Int)] = word.map((_, 1))
        val wordAndCount: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
        wordAndCount.collect().foreach(println)
      }
    }

    //6.启动SparkStreaming
    ssc.start()
    ssc.awaitTermination()
  }
}

DStream转换

DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
4.1 无状态转化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。注意,针对键值对的DStream转化操作(比如 reduceByKey())要添加import StreamingContext._才能在Scala中使用。

在这里插入图片描述需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。例如,reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
举个例子,在之前的wordcount程序中,我们只会统计几秒内接收到的数据的单词个数,而不会累加。
无状态转化操作也能在多个DStream间整合数据,不过也是在各个时间区间内。例如,键 值对DStream拥有和RDD一样的与连接相关的转化操作,也就是cogroup()、join()、leftOuterJoin() 等。我们可以在DStream上使用这些操作,这样就对每个批次分别执行了对应的RDD操作。
我们还可以像在常规的Spark 中一样使用 DStream的union() 操作将它和另一个DStream 的内容合并起来,也可以使用StreamingContext.union()来合并多个流。

有状态转化操作

4.2.1 UpdateStateByKey
UpdateStateByKey原语用于记录历史记录,有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的DStream,其内部的RDD 序列是由每个时间区间对应的(键,状态)对组成的。
updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,你需要做下面两步:

1. 定义状态,状态可以是一个任意的数据类型。
2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。
更新版的wordcount:
(1)编写代码
package com.atguigu.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WorldCount {

  def main(args: Array[String]) {

    // 定义更新状态方法,参数values为当前批次单词频度,state为以往批次单词频度
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }

    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(3))
    ssc.checkpoint("./ck")

    // Create a DStream that will connect to hostname:port, like hadoop102:9999
    val lines = ssc.socketTextStream("hadoop102", 9999)

    // Split each line into words
    val words = lines.flatMap(_.split(" "))

    //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))

    // 使用updateStateByKey来更新状态,统计从运行开始以来单词总的次数
    val stateDstream = pairs.updateStateByKey[Int](updateFunc)
    stateDstream.print()

    //val wordCounts = pairs.reduceByKey(_ + _)

    // Print the first ten elements of each RDD generated in this DStream to the console
    //wordCounts.print()

    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
    //ssc.stop()
  }

}
2)启动程序并向9999端口发送数据
[atguigu@hadoop102 kafka]$ nc -lk 9999
ni shi shui
ni hao ma
(3)结果展示
-------------------------------------------
Time: 1504685175000 ms
-------------------------------------------
-------------------------------------------
Time: 1504685181000 ms
-------------------------------------------
(shi,1)
(shui,1)
(ni,1)
-------------------------------------------
Time: 1504685187000 ms
-------------------------------------------
(shi,1)
(ma,1)
(hao,1)
(shui,1)
(ni,2)

4.2.2 Window Operations

Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。
基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次间隔的整数倍。窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的 windowDuration/batchInterval 个批次。如果有一个以 10 秒为批次间隔的源 DStream,要创建一个最近 30 秒的时间窗口(即最近 3 个批次),就应当把 windowDuration 设为 30 秒。而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。如果源 DStream 批次间隔为 10 秒,并且我们只希望每两个批次计算一次窗口结果, 就应该把滑动步长设置为 20 秒。
假设,你想拓展前例从而每隔十秒对持续30秒的数据生成word count。为做到这个,我们需要在持续30秒数据的(word,1)对DStream上应用reduceByKey。使用操作reduceByKeyAndWindow.

reduce last 30 seconds of data, every 10 second

windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 30, 20)

在这里插入图片描述关于Window的操作有如下原语:
(1)window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的Dstream
(2)countByWindow(windowLength, slideInterval):返回一个滑动窗口计数流中的元素。
(3)reduceByWindow(func, windowLength, slideInterval):通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流。
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。Note:默认情况下,这个操作使用Spark的默认数量并行任务(本地是2),在集群模式中依据配置属性(spark.default.parallelism)来做grouping。你可以通过设置可选参数numTasks来设置不同数量的tasks。
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):这个函数是上述函数的更高效版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的reduce函数”,也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。注意:为了使用这个操作,检查点必须可用。
(6)countByValueAndWindow(windowLength,slideInterval, [numTasks]):对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的值是其在滑动窗口中频率。如上,可配置reduce任务数量。
reduceByWindow() 和 reduceByKeyAndWindow() 让我们可以对每个窗口更高效地进行归约操作。它们接收一个归约函数,在整个窗口上执行,比如 +。除此以外,它们还有一种特殊形式,通过只考虑新进入窗口的数据和离开窗口的数据,让 Spark 增量计算归约结果。这种特殊形式需要提供归约函数的一个逆函数,比 如 + 对应的逆函数为 -。对于较大的窗口,提供逆函数可以大大提高执行效率

在这里插入图片描述

val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(
  {(x, y) => x + y},
  {(x, y) => x - y},
  Seconds(30),
  Seconds(10))
  //加上新进入窗口的批次中的元素 //移除离开窗口的老批次中的元素 //窗口时长// 滑动步长
countByWindow()和countByValueAndWindow()作为对数据进行计数操作的简写。countByWindow()返回一个表示每个窗口中元素个数的DStream,而countByValueAndWindow()返回的DStream则包含窗口中每个值的个数。
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10)) 
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
WordCount第三版:3秒一个批次,窗口12秒,滑步6秒。
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WorldCount {

  def main(args: Array[String]) {

    // 定义更新状态方法,参数values为当前批次单词频度,state为以往批次单词频度
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }

    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(3))
    ssc.checkpoint(".")

    // Create a DStream that will connect to hostname:port, like localhost:9999
    val lines = ssc.socketTextStream("hadoop102", 9999)

    // Split each line into words
    val words = lines.flatMap(_.split(" "))

    //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))

    val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6))

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()

    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
    //ssc.stop()
  }

}

4.3.1 Transform

Transform原语允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。
比如下面的例子,在进行单词统计的时候,想要过滤掉spam的信息。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(…) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(…) // join data stream with spam information to do data cleaning

}

4.3.2 Join

连接操作(leftOuterJoin, rightOuterJoin, fullOuterJoin也可以),可以连接Stream-Stream,windows-stream to windows-stream、stream-dataset
Stream-Stream Joins
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

DStream输出

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。
输出操作如下:
(1)print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()。
(2)saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。
(3)saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。
(4)saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。
(5)foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。注意:函数func在运行流应用的驱动中被执行,同时其中一般函数RDD操作从而强制其对于流RDD的运算。
通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。
注意:
(1)连接不能写在driver层面;
(2)如果写在foreach则每个RDD都创建,得不偿失;
(3)增加foreachPartition,在分区创建。

累加器和广播变量

累加器(Accumulators)和广播变量(Broadcast variables)不能从Spark Streaming的检查点中恢复。如果你启用检查并也使用了累加器和广播变量,那么你必须创建累加器和广播变量的延迟单实例从而在驱动因失效重启后他们可以被重新实例化。如下例述:

object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
  // Get or register the blacklist Broadcast
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter.add(count)
      false
    } else {
      true
    }
  }.collect().mkString("[", ", ", "]")
  val output = "Counts at time " + time + " " + counts
})

DataFrame ans SQL Operations

你可以很容易地在流数据上使用DataFrames和SQL。你必须使用SparkContext来创建StreamingContext要用的SQLContext。此外,这一过程可以在驱动失效后重启。我们通过创建一个实例化的SQLContext单实例来实现这个工作。如下例所示。我们对前例word count进行修改从而使用DataFrames和SQL来产生word counts。每个RDD被转换为DataFrame,以临时表格配置并用SQL进行查询。

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame =
  spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

你也可以从不同的线程在定义于流数据的表上运行SQL查询(也就是说,异步运行StreamingContext)。仅确定你设置StreamingContext记住了足够数量的流数据以使得查询操作可以运行。否则,StreamingContext不会意识到任何异步的SQL查询操作,那么其就会在查询完成之后删除旧的数据。例如,如果你要查询最后一批次,但是你的查询会运行5分钟,那么你需要调用streamingContext.remember(Minutes(5))(in Scala, 或者其他语言的等价操作)。

Caching / Persistence

和RDDs类似,DStreams同样允许开发者将流数据保存在内存中。也就是说,在DStream上使用persist()方法将会自动把DStreams中的每个RDD保存在内存中。当DStream中的数据要被多次计算时,这个非常有用(如在同样数据上的多次操作)。对于像reduceByWindow和reduceByKeyAndWindow以及基于状态的(updateStateByKey)这种操作,保存是隐含默认的。因此,即使开发者没有调用persist(),由基于窗操作产生的DStreams会自动保存在内存中。

性能调优

通过有效地使用群集资源来减少每批数据的处理时间。

设置正确的批处理大小,以便可以在接收到批处理数据后尽快对其进行处理(也就是说,数据处理与数据摄取保持同步)。

减少批处理时间

在Spark中可以进行许多优化,以最大程度地减少每批的处理时间。这些已在《调优指南》中详细讨论。本节重点介绍一些最重要的内容。

数据接收中的并行度

通过网络(例如Kafka,套接字等)接收数据需要对数据进行反序列化并将其存储在Spark中。如果数据接收成为系统的瓶颈,请考虑并行化数据接收。请注意,每个输入DStream都会创建一个接收器(在工作计算机上运行),该接收器接收单个数据流。因此,可以通过创建多个输入DStream并将其配置为从源接收数据流的不同分区来实现接收多个数据流。例如,可以将接收两个主题数据的单个Kafka输入DStream拆分为两个Kafka输入流,每个输入流仅接收一个主题。这将运行两个接收器,从而允许并行接收数据,从而提高了总体吞吐量。可以将这些多个DStream合并在一起以创建单个DStream。然后,可以将应用于单个输入DStream的转换应用于统一流。这样做如下。

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

接收机的块间隔

该间隔由配置参数确定 spark.streaming.blockInterval。对于大多数接收器,接收到的数据在存储在Spark内存中之前会合并为数据块。每批中的块数确定了将在类似地图的转换中用于处理接收到的数据的任务数。每批接收器中每个接收器的任务数大约为(批处理间隔/块间隔)。例如,200 ms的块间隔将每2秒批处理创建10个任务。如果任务数太少(即少于每台计算机的核心数),那么它将效率低下,因为将不会使用所有可用的核心来处理数据。要增加给定批处理间隔的任务数,请减小阻止间隔。但是,建议的块间隔最小值约为50毫秒,在此之下,任务启动开销可能是个问题。

使用多个输入流/接收器接收数据的一种替代方法是显式重新划分输入数据流(使用inputStream.repartition())。在进一步处理之前,这会将接收到的数据批分布在群集中指定数量的计算机上。

https://spark.apache.org/docs/latest/configuration.html#spark-streaming

https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

数据处理中的并行度

如果在计算的任何阶段使用的并行任务数量不够高,则群集资源可能无法得到充分利用。例如,对于像reduceByKey 和的分布式归约操作reduceByKeyAndWindow,并行任务的默认数量由spark.default.parallelism configuration属性控制。您可以将并行性级别作为参数传递(请参阅 PairDStreamFunctions 文档),或将spark.default.parallelism 配置属性设置为更改默认值。
https://spark.apache.org/docs/latest/configuration.html#spark-properties
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.html
https://spark.apache.org/docs/latest/configuration.html#spark-properties

数据序列化

可以通过调整序列化格式来减少数据序列化的开销。在流传输的情况下,有两种类型的数据被序列化。

1.输入数据:默认情况下,通过Receiver接收的输入数据通过StorageLevel.MEMORY_AND_DISK_SER_2存储在执行程序的内存中。也就是说,数据被序列化为字节以减少GC开销,并被复制以容忍执行器故障。同样,数据首先保存在内存中,并且仅在内存不足以容纳流计算所需的所有输入数据时才溢出到磁盘。显然,这种序列化会产生开销–接收器必须对接收到的数据进行反序列化,然后使用Spark的序列化格式对其进行重新序列化。

2.流操作生成的持久RDD:流计算生成的RDD可以保留在内存中。例如,窗口操作会将数据保留在内存中,因为它们将被多次处理。但是,与Spark Core默认的StorageLevel.MEMORY_ONLY不同,默认情况下,由流计算生成的持久性RDD会与StorageLevel.MEMORY_ONLY_SER(即序列化)保持一致,以最大程度地减少GC开销。

在这两种情况下,使用Kryo序列化都可以减少CPU和内存的开销。有关更多详细信息,请参见《Spark Tuning Guide》。对于Kryo,请考虑注册自定义类,并禁用对象引用跟踪(请参阅《配置指南》中与Kryo相关的配置)。

在流应用程序需要保留的数据量不大的特定情况下,将数据(两种类型)持久化为反序列化对象是可行的,而不会产生过多的GC开销。例如,如果您使用的是几秒钟的批处理间隔并且没有窗口操作,那么您可以尝试通过显式设置存储级别来禁用持久化数据中的序列化。这将减少由于序列化导致的CPU开销,从而可能在没有太多GC开销的情况下提高性能。

https://spark.apache.org/docs/latest/api/scala/org/apache/spark/storage/StorageLevel . h t m l h t t p s : / / s p a r k . a p a c h e . o r g / d o c s / l a t e s t / a p i / s c a l a / o r g / a p a c h e / s p a r k / s t o r a g e / S t o r a g e L e v e l .html https://spark.apache.org/docs/latest/api/scala/org/apache/spark/storage/StorageLevel .htmlhttps://spark.apache.org/docs/latest/api/scala/org/apache/spark/storage/StorageLevel.html
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/storage/StorageLevel.html$
https://spark.apache.org/docs/latest/tuning.html#data-serialization
https://spark.apache.org/docs/latest/configuration.html#compression-and-serialization

任务启动开销

如果每秒启动的任务数量很高(例如,每秒50个或更多),那么向执行者发送任务的开销可能会很大,并且将难以实现亚秒级的延迟。可以通过以下更改来减少开销:

执行模式:在独立模式或粗粒度的Mesos模式下运行Spark可以比细粒度的Mesos模式更好地执行任务启动时间。有关更多详细信息,请参阅“在Mesos上 运行”指南。
这些更改可以将批处理时间减少100毫秒,从而使亚秒级的批处理大小可行。

设置正确的批次间隔

为了使在群集上运行的Spark Streaming应用程序稳定,系统应该能够处理接收到的数据。换句话说,应尽快处理成批的数据。可以通过监视流式Web UI中的处理时间来发现这是否适用于应用程序 ,其中批处理时间应小于批处理间隔。

根据流计算的性质,所使用的批处理间隔可能会对数据速率产生重大影响,该速率可以由应用程序在一组固定的群集资源上维持。例如,让我们考虑前面的WordCountNetwork示例。对于特定的数据速率,系统可能能够跟上每2秒(即2秒的批处理间隔)但不每500毫秒报告一次字数的情况。因此,需要设置批次间隔,以便可以维持生产中的预期数据速率。

找出适合您的应用程序的正确批处理大小的一种好方法是使用保守的批处理间隔(例如5-10秒)和低数据速率对其进行测试。要验证系统是否能跟上数据速率,您可以检查每个已处理批处理经历的端到端延迟的值(可以在Spark驱动程序log4j日志中查找“ Total delay”(总延迟),也可以使用 流侦听器 界面)。如果将延迟保持为与批次大小相当,则系统是稳定的。否则,如果延迟持续增加,则意味着系统无法跟上并因此不稳定。一旦有了稳定配置的想法,就可以尝试提高数据速率和/或减小批处理大小。注意,由于暂时的数据速率增加而引起的延迟的瞬时增加可能是好的,只要该延迟减小回到较低的值(即,小于批大小)即可。
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/scheduler/StreamingListener.html

内存调优

Spark Streaming应用程序所需的群集内存量在很大程度上取决于所使用的转换类型。例如,如果要对最后10分钟的数据使用窗口操作,则群集应具有足够的内存以将10分钟的数据保留在内存中。或者,如果您想使用updateStateByKey大量的按键,则所需的存储空间会很大。相反,如果您想执行一个简单的map-filter-store操作,则所需的内存将很少。
通常,由于通过接收器接收的数据存储在StorageLevel.MEMORY_AND_DISK_SER_2中,因此无法容纳在内存中的数据将溢出到磁盘上。这可能会降低流式应用程序的性能,因此建议根据流式应用程序的要求提供足够的内存。最好尝试以小规模查看内存使用情况并据此进行估计。
内存调整的另一个方面是垃圾回收。对于需要低延迟的流应用程序,不希望由于JVM垃圾收集而导致较大的停顿。
有一些参数可以帮助您调整内存使用和GC开销:
DStream的持久性级别:如前面的“数据序列化”部分所述,默认情况下,输入数据和RDD被持久化为序列化字节。与反序列化的持久性相比,这减少了内存使用和GC开销。启用Kryo序列化可进一步减少序列化的大小和内存使用量。通过压缩可以进一步减少内存使用(请参见Spark配置spark.rdd.compress),而这会占用CPU时间。
清除旧数据:默认情况下,将自动清除DStream转换生成的所有输入数据和持久的RDD。Spark Streaming根据使用的转换来决定何时清除数据。例如,如果您使用10分钟的窗口操作,那么Spark Streaming将保留最后10分钟的数据,并主动丢弃较旧的数据。通过设置可以将数据保留更长的时间(例如,以交互方式查询较旧的数据)streamingContext.remember。
CMS垃圾收集器:强烈建议使用并发标记扫掠GC,以使与GC相关的暂停时间始终保持较低。尽管已知并发GC会降低系统的总体处理吞吐量,但仍建议使用并发GC以实现更一致的批处理时间。确保在驱动程序(使用–driver-java-options中spark-submit)和执行程序(使用Spark configuration spark.executor.extraJavaOptions)上都设置了CMS GC 。
其他提示:为了进一步减少GC开销,请尝试以下更多提示。
使用OFF_HEAP存储级别持久化RDD 。请参阅《Spark编程指南》中的更多详细信息。
使用更多具有较小堆大小的执行程序。这将减少每个JVM堆中的GC压力。
https://spark.apache.org/docs/latest/tuning.html#memory-tuning
https://spark.apache.org/docs/latest/tuning.html#memory-tuning
https://spark.apache.org/docs/latest/streaming-programming-guide.html#data-serialization
https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

要记住的要点:
DStream与单个接收器关联。为了获得读取并行性,需要创建多个接收器,即多个DStream。接收器在执行器中运行。它占据了一个核心。预订接收器插槽后,请确保有足够的内核可用于处理,即spark.cores.max应考虑接收器插槽。接收者以循环方式分配给执行者。

从流源接收数据时,接收器会创建数据块。每blockInterval毫秒生成一个新的数据块。在batchInterval期间创建了N个数据块,其中N = batchInterval / blockInterval。这些块由当前执行器的BlockManager分发给其他执行器的块管理器。之后,驱动程序上运行的网络输入跟踪器将被告知有关块的位置,以进行进一步处理。

在驱动程序上为在batchInterval期间创建的块创建了RDD。在batchInterval期间生成的块是RDD的分区。每个分区都是一个任务。blockInterval == batchinterval意味着将创建一个分区,并且可能在本地对其进行处理。

块上的映射任务在执行器中进行处理(一个执行器接收该块,另一个执行器复制该块),该执行器具有与块间隔无关的块,除非启动了非本地调度。除非有更大的块间隔,这意味着更大的块。较高的值会spark.locality.wait增加在本地节点上处理块的机会。需要在这两个参数之间找到平衡,以确保较大的块在本地处理。

您可以通过调用来定义分区数,而不必依赖batchInterval和blockInterval inputDstream.repartition(n)。这会随机重新随机排列RDD中的数据以创建n个分区。是的,以获得更大的并行度。虽然以洗牌为代价。RDD的处理由驾驶员的Jobscheduler安排为作业。在给定的时间点,只有一项作业处于活动状态。因此,如果一个作业正在执行,则其他作业将排队。

如果您有两个dstream,将形成两个RDD,并且将创建两个作业,这些作业将一个接一个地调度。为避免这种情况,您可以合并两个dstream。这将确保为dstream的两个RDD形成单个unionRDD。然后将此unionRDD视为一项工作。但是,RDD的分区不会受到影响。

如果批处理时间超过batchinterval,则显然接收方的内存将开始填满,并最终引发异常(最有可能是BlockNotFoundException)。当前,无法暂停接收器。使用SparkConf配置spark.streaming.receiver.maxRate,可以限制接收器的速率。

容错语义

在本节中,我们将讨论发生故障时Spark Streaming应用程序的行为。

背景
为了理解Spark Streaming提供的语义,让我们记住Spark的RDD的基本容错语义。

RDD是一个不变的,确定性可重新计算的分布式数据集。每个RDD都会记住在容错输入数据集上用于创建它的确定性操作的沿袭。
如果RDD的任何分区由于工作节点故障而丢失,则可以使用操作沿袭从原始容错数据集中重新计算该分区。
假设所有RDD转换都是确定性的,则最终转换后的RDD中的数据将始终相同,而不管Spark集群中的故障如何。
Spark在容错文件系统(例如HDFS或S3)中的数据上运行。因此,从容错数据生成的所有RDD也是容错的。但是,Spark Streaming并非如此,因为在大多数情况下,数据都是通过网络接收的(使用时除外 fileStream)。为了对所有生成的RDD实现相同的容错属性,将接收到的数据复制到集群中工作节点中的多个Spark执行程序中(默认复制因子为2)。这导致系统中发生故障时需要恢复的两种数据:

接收和复制的数据-由于该数据的副本存在于其他节点之一上,因此该数据在单个工作节点发生故障时仍可幸免。
已接收但已缓冲数据以进行复制-由于未复制数据,因此恢复此数据的唯一方法是再次从源中获取数据。
此外,我们应该关注两种失败:

工作节点的故障-运行执行程序的任何工作节点都可能发生故障,并且这些节点上的所有内存中数据都将丢失。如果任何接收器在故障节点上运行,则其缓冲的数据将丢失。
驱动程序节点发生故障-如果运行Spark Streaming应用程序的驱动程序节点发生故障,则显然SparkContext会丢失,并且所有执行程序及其内存中的数据也会丢失。
有了这些基础知识,让我们了解Spark Streaming的容错语义。

定义
流系统的语义通常是根据系统可以处理每个记录多少次来捕获的。系统在所有可能的操作条件下(尽管有故障等)可以提供三种保证。

最多一次:每个记录将被处理一次或根本不被处理。
至少一次:每个记录将被处理一次或多次。它比最多一次强,因为它确保不会丢失任何数据。但是可能有重复项。
恰好一次:每个记录将被恰好处理一次-不会丢失任何数据,也不会多次处理任何数据。这显然是三者中最有力的保证。
基本语义
概括地说,在任何流处理系统中,处理数据都需要三个步骤。

接收数据:使用接收器或其他方式从源接收数据。

转换数据:使用DStream和RDD转换对接收到的数据进行转换。

推送数据:将最终转换后的数据推送到外部系统,例如文件系统,数据库,仪表板等。

如果流应用程序必须获得端到端的精确一次保证,那么每个步骤都必须提供精确一次保证。也就是说,每条记录必须被接收一次,被转换一次,并被推送到下游系统一次。让我们在Spark Streaming的上下文中了解这些步骤的语义。

接收数据:不同的输入源提供不同的保证。下一部分将对此进行详细讨论。

转换数据:由于RDD提供的保证,所有接收到的数据将只处理一次。即使出现故障,只要可以访问收到的输入数据,最终转换后的RDD始终具有相同的内容。

推送数据:默认情况下,输出操作确保至少一次语义,因为它取决于输出操作的类型(是否为幂等)和下游系统的语义(是否支持事务)。但是用户可以实现自己的事务处理机制来实现一次语义。本节稍后将对此进行更详细的讨论。

接收数据的语义
不同的输入源提供不同的保证,范围从至少一次到恰好一次。阅读更多详细信息。

带文件
如果所有输入数据已经存在于诸如HDFS之类的容错文件系统中,则Spark Streaming始终可以从任何故障中恢复并处理所有数据。这提供 了一次精确的语义,这意味着无论发生什么故障,所有数据都会被精确地处理一次。

使用基于接收器的源
对于基于接收方的输入源,容错语义取决于故障情况和接收方的类型。正如我们所讨论的前面,有两种类型的接收器:

可靠的接收器-这些接收器仅在确保已复制接收到的数据之后才确认可靠的来源。如果此类接收器发生故障,则源将不会收到对缓冲的(未复制的)数据的确认。因此,如果重新启动接收器,则源将重新发送数据,并且不会由于失败而丢失任何数据。
不可靠的接收器-此类接收器不发送确认,因此当由于工作程序或驱动程序故障而失败时,可能会丢失数据。
根据所使用的接收器类型,我们可以实现以下语义。如果工作节点发生故障,那么使用可靠的接收器不会造成数据丢失。对于不可靠的接收器,接收到但未复制的数据可能会丢失。如果驱动程序节点发生故障,则除了这些丢失之外,所有已接收并复制到内存中的过去数据都将丢失。这将影响有状态转换的结果。

为了避免丢失过去收到的数据,Spark 1.2引入了预写日志,该日志将收到的数据保存到容错存储中。通过启用预写日志和可靠的接收器,数据丢失为零。就语义而言,它至少提供了一次保证。

使用Kafka Direct API
在Spark 1.3中,我们引入了新的Kafka Direct API,它可以确保Spark Streaming一次接收所有Kafka数据。同时,如果实施一次精确的输出操作,则可以实现端到端的一次精确保证。《Kafka集成指南》中进一步讨论了这种方法。

输出操作的语义
输出操作(如foreachRDD)至少具有一次语义,也就是说,在工作程序出现故障的情况下,转换后的数据可能多次写入外部实体。尽管使用saveAs***Files操作将其保存到文件系统是可以接受的 (因为文件将被相同的数据简单地覆盖),但可能需要付出额外的努力才能实现一次精确的语义。有两种方法。

幂等更新:多次尝试总是写入相同的数据。例如,saveAs***Files始终将相同的数据写入生成的文件。

事务性更新:所有更新都是以事务方式进行的,因此原子更新仅进行一次。一种做到这一点的方法如下。

使用批处理时间(可在中找到foreachRDD)和RDD的分区索引来创建标识符。该标识符唯一地标识流应用程序中的Blob数据。
使用标识符以事务方式(即,原子地一次)更新与此Blob的外部系统。也就是说,如果尚未提交标识符,则自动提交分区数据和标识符。否则,如果已经提交,则跳过更新。

dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

https://spark.apache.org/docs/latest/streaming-custom-receivers.html

自定义接收器

Spark Streaming自定义接收器
Spark Streaming可以从其内置支持的范围之外的任何任意数据源(即,除了Kafka,Kinesis,文件,套接字等)接收流数据。这就要求开发人员实现一个为从相关数据源接收数据而定制的接收器。本指南介绍了实现自定义接收器并在Spark Streaming应用程序中使用它的过程。请注意,可以在Scala或Java中实现自定义接收器。

实施自定义接收器
首先从实现Receiver (Scala doc, Java doc)开始。自定义接收方必须通过实现两个方法来扩展此抽象类

onStart():开始接收数据需要做的事情。
onStop():停止接收数据的操作。
双方onStart()并onStop()不能无限期地阻塞。通常,onStart()将启动负责接收数据的线程,并onStop()确保停止这些接收数据的线程。接收线程也可以使用isStopped(),一个Receiver方法,以检查他们是否应该停止接收数据。

接收到数据后,可以通过调用将该数据存储在Spark内部store(data),这是Receiver类提供的方法。有多种类型,store()它们可以一次存储记录的接收数据,也可以存储为对象/序列化字节的整个集合。请注意,store()用于实现接收器的风格 会影响其可靠性和容错语义。稍后将对此进行更详细的讨论。

接收线程中的任何异常都应被捕获并正确处理,以避免接收器出现静默故障。restart()将通过异步调用onStop()然后onStart()延迟后调用来重新启动接收器。 stop()将呼叫onStop()并终止接收器。同样,reportError() 在不停止/重新启动接收器的情况下,向驱动程序报告错误消息(在日志和UI中可见)。

以下是一个自定义接收器,它通过套接字接收文本流。它将文本流中以’\ n’分隔的行视为记录,并使用Spark存储它们。如果接收线程在连接或接收时遇到任何错误,则重新启动接收器以进行另一次连接尝试。

class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)

      // Until stopped or connection broken continue reading
      val reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
      userInput = reader.readLine()
      while(!isStopped && userInput != null) {
        store(userInput)
        userInput = reader.readLine()
      }
      reader.close()
      socket.close()

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")
    } catch {
      case e: java.net.ConnectException =>
        // restart if could not connect to server
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        // restart if there is any other error
        restart("Error receiving data", t)
    }
  }
}

在Spark Streaming应用程序中使用自定义接收器
可以通过使用自定义接收器在Spark Streaming应用程序中使用 streamingContext.receiverStream()。这将使用自定义接收器实例接收的数据创建输入DStream,如下所示:

// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = customReceiverStream.flatMap(_.split(" "))
...

接收器可靠性
正如《Spark Streaming编程指南》中简要讨论的那样, 基于其可靠性和容错语义,有两种接收器。

可靠的接收器-对于允许确认已发送数据的可靠源, 可靠的接收器正确地向源确认已可靠地接收并存储了数据(即已成功复制)。通常,实现此接收器需要仔细考虑源确认的语义。
不可靠的接收器-一个不可靠的接收器并没有发送确认的资源等。可以将其用于不支持确认的来源,甚至可以用于不希望或不需要进入确认复杂性的可靠来源。
要实现可靠的接收器,您必须使用store(multiple-records)存储数据。这种store形式的阻塞调用仅在所有给定记录已存储在Spark中后才返回。如果接收者的已配置存储级别使用复制(默认情况下启用),则复制完成后将返回此调用。因此,它确保了数据被可靠地存储,并且接收器现在可以适当地确认源了。这样可以确保当接收方在复制数据的过程中发生故障时,不会丢失任何数据-缓冲的数据将不会被确认,因此稍后将由源重新发送。

一个不可靠的接收器没有实现任何这种逻辑的。它可以简单地从源中接收记录,并使用一次插入它们store(single-record)。虽然没有获得的可靠性保证store(multiple-records),但它具有以下优点:

系统负责将数据分块为适当大小的块(请在《Spark Streaming编程指南》中查找块间隔)。
如果已指定速率限制,则系统会控制接收速率。
由于这两个原因,不可靠的接收器比可靠的接收器更易于实现。
下表总结了两种类型的接收器的特性

接收器类型 特征
不可靠的接收者 易于实现。
系统负责块生成和速率控制。没有容错保证,会因接收器故障而丢失数据。
可靠的接收者 强大的容错保证,可以确保零数据丢失。
接收器实现要处理的块生成和速率控制。
实现复杂度取决于源的确认机制。

如何在Spark流中实现精确语义

Spark Streaming使用Direct模式对接上游kafka。无论kafka有多少个partition, 使用Direct模式总能保证SS中有相同数量的partition与之相对, 也就是说SS中的KafkaRDD的并发数量在Direct模式下是由上游kafka决定的。 在这个模式下,kafka的offset是作为KafkaRDD的一部分存在,会存储在checkpoints中, 由于checkpoints只存储offset内容,而不存储数据,这就使得checkpoints是相对轻的操作。 这就使得SS在遇到故障时,可以从checkpoint中恢复上游kafka的offset,从而保证exactly once

checkPoints(检查点)
如果你打开了Spark的checkpointing选线,偏移量会被保存在checkpoint里面。这是很容易使用的,但是有一些缺点。首先你的输出操作必须是幂等的,否则就会得到一些重复的输出;事物并不是一个好的选择。除此之外,如果你的代码有了更改,就不能从checkpoint之中恢复。对于计划升级,可以在旧代码运行的同事部署新的代码来缓解这个问题(因为输出是幂等的,所以不会造成冲突)。但是对于意料之外的故障而需要更改代码的,除非你有其他的方式来获取开始的偏移量,否则就会丢失数据。

kafka本身(Kafka itself)
Kafka本身有一个提交偏移量到一个独特的Kafka topic的API,默认情况下,一个新的消费者会周期性的提交偏移量。这几乎不会是你想要的确切的提交方式,因为消息在成功拉取但是还没有在Spark中输出结果,就会导致未确定的语义。这就是为什么流处理例子中会将enable.auto.commit参数设置为false。然而,你可以在你确保输出操作已经完成后使用commitSync API向Kafka提交偏移量。与checkpoint方式相比,该种方式的好处是Kafka是一个持久化的存储,而不需要考虑代码的更新。然而,Kafka是非事物性的,所以你的输出操作仍需要具有幂等性。

sink到外部存储后,offset才能commit,不管是到zk,还是mysql里面,你最好保证它在一个transaction里面,而且必须在输出到外部存储(这里最好保证一个upsert语义,根据unique key来实现upset语义)之后,然后这边源头driver再根据存储的offeset去产生kafka RDD,executor再根据kafka每个分区的offset去消费数据

  1. Stateful Processing SQL ( <2.x mapWithState、updateStateByKey):我们要实现跨批次带状态的计算的话,在1.X版本,我们通过这两个接口去做,但还是需要把这个状态存到hdfs或者外部去,实现起来比较麻烦一点。

  2. Real Multi-Stream Join:没办法实现真正的多个流join的语义。

3)End-To-End Exactly-Once Semantics:它的端到端的exactly-once语义实现起来比较麻烦,需要sink到外部存储后还需要手动的在事务里面提交offset。

Spark.default.parallelism
Spark.sql.shuffle.partitions 调小
batch在生产上会调得大一点,我们设为1000

一次语义是流处理的高级主题之一。即使系统或网络出现故障,也要一次处理一次所有消息,不仅流处理框架需要提供这种功能,而且消息传递系统,输出数据存储以及我们如何实现处理过程也需要提供这种功能。 ,总共可以确保一次语义。在本文中,我将演示如何使用Spark流技术(以Kafka作为数据源,而MySQL为输出存储)来实现一次精确的流处理。

精确一次是流处理中非常强大的语义,它将不可避免地给您的应用程序带来一些开销并影响吞吐量。它也不适用于窗口操作。因此,您需要确定是否有必要花费这些精力,或者即使数据丢失很少也可以使用较弱的语义就足够了。但是,肯定地知道如何一次完成一次学习是一个很好的机会,这是一个很大的乐趣。

流处理语义
流处理中有三种语义,即最多一次,至少一次和完全一次。在典型的Spark Streaming应用程序中,有三个处理阶段:接收数据,进行转换和推送输出。每个阶段需要采取不同的措施来实现不同的语义。

对于接收数据,它很大程度上取决于数据源。例如,从诸如HDFS之类的容错文件系统中读取文件可以使我们获得一次精确的语义。对于支持确认的上游队列,例如RabbitMQ,我们可以将其与Spark的预写日志结合使用,以实现至少一次语义。对于不可靠的接收器,例如socketTextStream,可能由于工作程序/驱动程序故障而导致数据丢失,并给我们带来了不确定的语义。另一方面,Kafka是基于偏移量的,其直接API可以为我们提供一次精确的语义。

当使用Spark的RDD转换数据时,我们会自动获得一次语义,因为RDD本身是不可变的,可容错的,并且在确定性上可以重新计算。只要源数据可用,并且在转换过程中没有副作用,结果将始终相同。

默认情况下,输出操作具有至少一次的语义。foreachRDD如果工作程序出现故障,该函数将执行一次以上,从而将相同的数据多次写入外部存储。解决此问题的方法有两种,幂等更新和事务更新。在以下各节中将进一步讨论它们。

与幂等写入恰好一次

如果多次写入产生相同的数据,则此输出操作是幂等的。saveAsTextFile是典型的幂等更新;具有唯一密钥的消息可以不重复地写入数据库。这种方法将为我们提供等效的一次精确语义。请注意,尽管它通常仅用于地图过程,并且需要在Kafka DStream上进行一些设置。

设置enable.auto.commit到false。默认情况下,Kafka DStream会在接收到数据后立即提交使用者偏移量。我们要推迟此操作,直到批次已完全处理。
打开Spark Streaming的检查点以存储Kafka偏移量。但是,如果应用程序代码更改,则检查点数据将不可重用。这导致了第二种选择:
在输出后提交卡夫卡偏移量。Kafka提供了一个commitAsyncAPI,HasOffsetRanges该类可用于从初始RDD中提取偏移量:

messages.foreachRDD {rdd => val offsetRanges = rdd.asInstanceOf [ HasOffsetRanges ] .offsetRanges   rdd.foreachPartition {iter        => 
//输出到数据库  
}   messages.asInstanceOf [ CanCommitOffsets ] .commitAsync(offsetRanges)}

与事务写入完全一致

事务更新需要唯一的标识符。可以从批处理时间,分区ID或Kafka偏移量生成数据,然后将结果与标识符一起写入单个事务中的外部存储中。此原子操作为我们提供了一次精确的语义,并且可以应用于仅映射过程和聚合过程。

通常,写入数据库应该在foreachPartition,即工作节点中进行。仅映射过程确实如此,因为Kafka RDD的分区对应于Kafka分区,因此我们可以像这样提取每个分区的偏移量:

messages.foreachRDD {rdd => val offsetRanges = rdd.asInstanceOf [ HasOffsetRanges ] .offsetRanges   rdd.foreachPartition {iter => val offsetRange = offsetRanges(TaskContext .get.partitionId)  } }

但是对于改组操作(例如错误日志计数示例),我们需要首先将结果收集回驱动程序中,然后执行事务。

messages.foreachRDD {rdd => val offsetRanges = rdd.asInstanceOf [ HasOffsetRanges ] .offsetRanges val result = processLogs (rdd).collect()//解析日志并计数错误DB .localTx {隐式会话=>     result.foreach { case(时间,计数)=> //保存到error_log表中    }     offsetRanges.foreach {offsetRange => val受影响的行= sql“” “”      更新kafka_offset设置offset = $ {offsetRange.untilOffset}       ,其中topic = $ {topic}和`partition` = $ {offsetRange.partition}      和offset = $ {offsetRange.fromOffset}       “
“” .update.apply()
      如果(affectedRows!= 1{抛出新异常(“更新偏移量失败”)      }     }   } }

如果偏移量更新失败,或者通过检测到重复的偏移量范围offset != $fromOffset,则整个事务都会回滚,从而保证了语义恰好一次。

实现-zookeeper

package com.bigdata.saviour.kafka

import kafka.utils.ZkUtils
import org.apache.kafka.common.TopicPartition
import org.apache.spark.internal.Logging
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, OffsetRange}
import org.apache.zookeeper.data.ACL

import scala.collection.JavaConversions
import scala.collection.mutable.ListBuffer

@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class ZKGroupDirs(val group: String) {
  def consumerDir = ZkUtils.ConsumersPath

  def consumerGroupDir = consumerDir + "/" + group

  def consumerRegistryDir = consumerGroupDir + "/ids"

  def consumerGroupOffsetsDir = consumerGroupDir + "/offsets"

  def consumerGroupOwnersDir = consumerGroupDir + "/owners"
}

@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) {
  def consumerOffsetDir = consumerGroupOffsetsDir + "/" + topic

  def consumerOwnerDir = consumerGroupOwnersDir + "/" + topic
}

class OffsetZkManager(zkUrl: String) extends Logging {

  val sessionTimeout = 10000
  val connectionTimeout = 10000
  val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
  val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
  val zkClient = zkClientAndConnection._1


  def readOffsets(topics: Seq[String], groupId: String):
  Map[TopicPartition, Long] = {

    val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
    val partitionMap = zkUtils.getPartitionsForTopics(topics)

    // /consumers/<groupId>/offsets/<topic>/
    partitionMap.foreach(topicPartitions => {
      val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
      topicPartitions._2.foreach(partition => {
        val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
        try {
          val offsetStatTuple = zkUtils.readData(offsetPath)
          if (offsetStatTuple != null) {
            topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
          }
        } catch {
          case e: Exception =>
            topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)
        }
      })
    })

    topicPartOffsetMap.toMap
  }

  def persistOffsets(offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean): Unit = {
    offsets.foreach(or => {
      val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);

      val acls = new ListBuffer[ACL]()
      //      val acl = new ACL
      //      acl.setId(ANYONE_ID_UNSAFE)
      //      acl.setPerms(PERMISSIONS_ALL)
      //      acls += acl

      val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
      val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset

      zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "", JavaConversions.bufferAsJavaList(acls))

      //      LOGGER.debug("persisting offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
    })
  }

  def main(args: Array[String]): Unit = {


  }

}

实现-hbase

create ‘stream_kafka_offsets’, {NAME=>‘offsets’, TTL=>2592000}
RowKey Layout
row: <TOPIC_NAME>:<GROUP_ID>:<EPOCH_BATCHTIME_MS>
column family: offsets
qualifier: <PARTITION_ID>
value: <OFFSET_ID>

package com.bigdata.saviour.kafka

import kafka.utils.ZkUtils
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Scan}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange

object OffsetHbaseManager {


  /*
 Save offsets for each batch into HBase
 create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000}
row:              <TOPIC_NAME>:<GROUP_ID>:<EPOCH_BATCHTIME_MS>
column family:    offsets
qualifier:        <PARTITION_ID>
value:            <OFFSET_ID>
*/
  def saveOffsets(TOPIC_NAME: String, GROUP_ID: String, offsetRanges: Array[OffsetRange],
                  hbaseTableName: String, batchTime: org.apache.spark.streaming.Time) = {
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.addResource("src/main/resources/hbase-site.xml")
    val conn = ConnectionFactory.createConnection(hbaseConf)
    val table = conn.getTable(TableName.valueOf(hbaseTableName))
    val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(batchTime.milliseconds)
    val put = new Put(rowKey.getBytes)
    for (offset <- offsetRanges) {
      put.addColumn(Bytes.toBytes("offsets"), Bytes.toBytes(offset.partition.toString),
        Bytes.toBytes(offset.untilOffset.toString))
    }
    table.put(put)
    conn.close()
  }

  /* Returns last committed offsets for all the partitions of a given topic from HBase in following  cases.
*/
  def getLastCommittedOffsets(TOPIC_NAME: String, GROUP_ID: String, hbaseTableName: String,
                              zkQuorum: String, zkRootDir: String, sessionTimeout: Int, connectionTimeOut: Int): Map[TopicPartition, Long] = {

    val hbaseConf = HBaseConfiguration.create()
    val zkUrl = zkQuorum + "/" + zkRootDir
    val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl,
      sessionTimeout, connectionTimeOut)
    val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
    val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME
    )).get(TOPIC_NAME).toList.head.size
    zkClientAndConnection._1.close()
    zkClientAndConnection._2.close()

    //Connect to HBase to retrieve last committed offsets
    val conn = ConnectionFactory.createConnection(hbaseConf)
    val table = conn.getTable(TableName.valueOf(hbaseTableName))
    val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" +
      String.valueOf(System.currentTimeMillis())
    val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
    val scan = new Scan()
    val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(
      stopRow.getBytes).setReversed(true))
    val result = scanner.next()
    var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
    if (result != null) {
      //If the result from hbase scanner is not null, set number of partitions from hbase      to the number of cells
      hbaseNumberOfPartitionsForTopic = result.listCells().size()
    }

    val fromOffsets = collection.mutable.Map[TopicPartition, Long]()

    if (hbaseNumberOfPartitionsForTopic == 0) {
      // initialize fromOffsets to beginning
      for (partition <- 0 to zKNumberOfPartitionsForTopic - 1) {
        fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> 0)
      }
    } else if (zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic) {
      // handle scenario where new partitions have been added to existing kafka topic
      for (partition <- 0 to hbaseNumberOfPartitionsForTopic - 1) {
        val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
          Bytes.toBytes(partition.toString)))
        fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> fromOffset.toLong)
      }
      for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic - 1) {
        fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> 0)
      }
    } else {
      //initialize fromOffsets from last run
      for (partition <- 0 to hbaseNumberOfPartitionsForTopic - 1) {
        val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
          Bytes.toBytes(partition.toString)))
        fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> fromOffset.toLong)
      }
    }
    scanner.close()
    conn.close()
    fromOffsets.toMap
  }




 def main(args: Array[String]): Unit = {

val fromOffsets= getLastCommittedOffsets(topic,consumerGroupID,hbaseTableName,zkQuorum,
 
                                   zkKafkaRootDir,zkSessionTimeOut,zkConnectionTimeOut)
 
val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,
                           Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets))
               

/*
For each RDD in a DStream apply a map transformation that processes the message.
*/
 
inputDStream.foreachRDD((rdd,batchTime) => {
 
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 
  offsetRanges.foreach(offset => println(offset.topic,offset.partition, offset.fromOffset,
 
                        offset.untilOffset))
 
  val newRDD = rdd.map(message => processMessage(message))
 
  newRDD.count()
 
  saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,batchTime)
 
})
                           
}
  }

}

实现方式-mysql(推荐)

package com.bigdata.saviour.kafka

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import cn.hutool.db.Session
import cn.hutool.db.ds.DSFactory
import cn.hutool.db.Entity
import java.sql.SQLException

class OffsetMysqlManager(topic: String, groupId: String) {

  def getLastOffset(): Unit = {

    // The details depend on your data store, but the general idea looks like this

    // begin from the the offsets committed to the database
        val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
         new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("untilOffset")
        }.toMap

       val stream = KafkaUtils.createDirectStream[String, String](
          streamingContext,
         PreferConsistent,
        Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
     )

  }

  def saveOffset[T](stream: DStream[T]): Unit = {

    stream.foreachRDD { rdd =>

      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      //自定义数据源(此处取test分组的数据源)
      val session = Session.create(DSFactory.get("test"))
      val TABLE_NAME = "kafka_offset"

      import cn.hutool.db.Entity

      try {
        session.beginTransaction

        //业务逻辑
        //val results = yourCalculation(rdd)
        //rdd.foreach()

        offsetRanges.foreach(offsetRange => {
          val entity =
            Entity.create(TABLE_NAME)
              .set("topic", offsetRange.topic)
              .set("groupId", groupId)
              .set("partition", offsetRange.partition)
              .set("untilOffset", offsetRange.untilOffset)

          // 增,生成SQL为 INSERT INTO `table_name` SET(`字段1`, `字段2`) VALUES(?,?)
          session.insert(entity)
        })

        
        // update offsets where the end of existing offsets matches the beginning of this batch of offsets

        // assert that offsets were updated correctly

       
        session.commit
      } catch {
        case e: SQLException =>
          session.quietRollback
      }


    }
  }
 def main(args: Array[String]): Unit = {
	
}

}

零拷贝

DMA后,就可以实现绝对的零拷贝了,因为网卡是直接去访问系统主内存的
在Java中的零拷贝实现是在FileChannel中,其中有个方法transferTo(position,fsize,src)

通过java.nio.channels.FileChannel中的transferTo方法来实现的。transferTo方法底层是基于操作系统的sendfile这个system call来实现的(不再需要拷贝到用户态了),sendfile负责把数据从某个fd(file descriptor)传输到另一个fd。
DMA是一种硬件直接访问系统主内存的技术。
多种硬件都已使用了DMA技术,其中就包括网卡(NIC)。
零拷贝技术减少了用户态与内核态之间的切换,让拷贝次数降到最低,从而实现高性能。

如何设置一个合理的批处理时间

需要根据应用本身、集群资源情况,以及关注和监控Spark Streaming系统的运行情况来调整,重点关注监控界面中的Total Delay
spark.executor.extraJavaOptions=-XX:+UseConc MarkSweepGC
–conf "spark.executor.extraJavaOptions=-XX:+UseConc MarkSweepGC"来配置垃圾回收机制

合理的parallelism:

Spark中的partition和Kafka中的Partition是一一对应的
一个Executor占用了多个core,但是总的CPU使用率却不高,让单个Executor占用更少的core,同时Worker下面增加更多的Executor;或者从另一个角度,增加单个节点的worker数量,当然这需要修改Spark集群的配置,从而增加CPU利用率

Executor的数量和每个Executor分到的内存大小成反比,如果每个Executor的内存过小,容易产生内存溢出(out of memory)的问题

Executor的数量和每个Executor分到的内存大小成反比

reduceByKey/aggregateByKey来代替groupByKey。而存在数据库连接、资源加载创建等需求时,我们可以使用带partition的操作,
这样在每一个分区进行一次操作即可,因为分区是物理同机器的,并不存在这些资源序列化的问题,从而大大减少了这部分操作的开销。例如,可以用mapPartitions、foreachPartitions操作来代替map、foreach操作。

filter+coalesce

spark.streaming.receiver.maxRate
如果批处理时间大于batchinterval,那么很明显,接收方的内存将逐渐被填满,并最终抛出异常(很可能是BlockNotFoundException)
如果做限制100,那么每秒最大吞吐就是100条。

spark.streaming.kafka.maxRatePerPartition
Kafka每个partition拉取的上限,
默认是无上限的,即Kafka有多少数据,SparkStreaming就会一次性全拉出,但是上节提到的批处理时间是一定的,不可能动态变化,
如果持续数据频率过高,同样会造成数据堆积、阻塞的现象

数据总量还需乘以所有的partition数量,调整两个参数maxRatePerPartition和batchDuration使得数据的拉取和处理能够平衡,
尽可能地增加整个系统的吞吐量,可以观察监控界面中的Input Rate和Processing Time

spark.streaming.kafka.maxRatePerPartition
注意该参数配置的是Kafka每个partition拉取的上限,数据总量还需乘以所有的partition数量

限流

Spark Streaming是先从broker里查询到每个分区的latestOffset,这样就可以得到每个分区的offset range,再用range和上一步预估的速率做对比计算就可以确定每个分区的处理的消息量。整个计算步骤:

1、offset range的消息量 totalLag
2、有效速率=取设置的maxRatePerPartition和预估的速率最小值
3、一个batch的每个分区每秒接收到的消息量=batchDuration*有效速率

spark.streaming.kafka.maxRatePerPartition控制spark读取的每个分区最大消息数。从上面的分析过程可以预见到,每个分区接收到的消息量<=batchDuration * spark.streaming.kafka.maxRatePerPartition.

1、首次启动Streaming应用,kafka保留了大量未消费历史消息,并且auto.offset.reset=latest
可以防止第一个batch接收大量消息、处理时间过长和内存溢出

2、防止kafka producer突然生产大量消息,一个batch接收到大量数据,导致batch之间接收到的数据倾斜

其他

Cache
Dstream如果被反复使用,最好利用cache()函数将该数据流缓存起来,
防止过度地调度资源造成的网络开销。可以参考并观察Scheduling Delay参数

repartition(n)
除了依赖于batchInterval和blockInterval,我们可以直接通过inputDstream. repartition(n)来确定分支的数量。这个操作会重新打乱(reshuffles)RDD中的数据,随机的分配给n个分支。当然打乱(shuffle)过程会造成一定的开销,但是会有更高的并行度。

spark.locality.wait设置一个更大的值
更有可能在本地节点处理数据块。我们需要在两个参数间(blockInterval和spark.locality.wait)做一个折中,确保越大的数据块更可能在本地被处理。
每个分支是Spark中的一个任务(task)。如果blockInterval == batchInterval,那么意味着创建了单一分支,并且可能直接在本地处理。

中间就是让用户描述pipeline。
SQL就是kafka的多个topic,输出选择一个输出表,SQL把上面消费的kafka DStream注册成表,然后写一串pipeline,
最后我们帮用户封装了一些对外sink(刚刚提到的各种存储都支持,如果存储能实现upsert语义的话,我们都是支持了的)。

f foo(in: Int): Boolean = if (in == 0) false else bar(in - 1)
def bar(in: Int): Boolean = if (in == 0) true else foo(in - 1)

供的reduceByWindow函数支持一个inverse reduce函数,比如你计算最近1小时,按秒级别窗口滑动,
spark通过实现inverse reduce函数每次只计算进来和要逐出的子窗口

spark.streaming.stopGracefullyOnShutdown

背压

当batch processing time>batchinterval 这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题;
在这里插入图片描述新增了一个RateController实现自动调节数据的传输速率。基于 processingDelay 、schedulingDelay 、当前 Batch 处理的记录条数以及处理完成事件来估算出一个速率;这个速率主要用于更新流每秒能够处理的最大记录的条数。
InputDStreams 内部的 RateController 里面会存下计算好的最大速率,这个速率会在处理完 onBatchCompleted 事件之后将计算好的速率推送到 ReceiverSupervisorImpl,这样接收器就知道下一步应该接收多少数据了。

根据当前 job 的处理情况来调节后续批次的获取 kafka 消息的条数
RateController RateController
需要的反馈数据是任务处理的结束时间、调度时间、处理时间、消息条数
通过 SparkListener 体系获得,然后通过 PIDRateEsimator 的 compute 计算得到一个速率,进而可以计算得到一个 offset
限速设置最大消费条数比较得到一个最终要消费的消息最大 offset

Spark Streaming Backpressure: 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。

在原架构的基础上加上一个新的组件RateController,这个组件负责监听“OnBatchCompleted”事件,然后从中抽取processingDelay 及schedulingDelay信息. Estimator依据这些信息估算出最大处理速度(rate),最后由基于Receiver的Input Stream将rate通过ReceiverTracker与ReceiverSupervisorImpl转发给BlockGenerator(继承自RateLimiter).

令牌桶机制: 大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。当进行某操作时需要令牌时会从令牌桶中取出相应的令牌数,如果获取到则继续操作,否则阻塞。用完之后不用放回。

Streaming 数据流被Receiver接收后,按行解析后存入iterator中。然后逐个存入Buffer,在存入buffer时会先获取token,如果没有token存在,则阻塞;如果获取到则将数据存入buffer. 然后等价后续生成block操作。

动态控制数据控制数据接收速率来适配集群数据处理能力
数据处理速度跟不上数据接收速度,此时在数据接收端(即Receiver 一般数据接收端都运行在executor上)就会积累数据,
数据是通过BlockManager管理的,如果数据存储采用MEMORY_ONLY模式就会导致OOM,
采用MEMORY_AND_DISK多余的数据保存到磁盘上反而会增加数据读取时间

spark.streaming.backpressure.initialRate
spark.streaming.backpressure.enabled 设置为 true 开启反压
spark.streaming.kafka.maxRatePerPartition 每个partition每秒最多消费条数
spark.streaming.backpressure.rateEstimator 速率估算器类,默认值为 pid ,目前 Spark 只支持这个。
spark.streaming.backpressure.pid.proportional:用于响应错误的权重(最后批次和当前批次之间的更改)。默认值为1,只能设置成非负值。
spark.streaming.backpressure.pid.integral:错误积累的响应权重,具有抑制作用(有效阻尼)。默认值为 0.2 ,只能设置成非负值。
spark.streaming.backpressure.pid.derived:对错误趋势的响应权重。 这可能会引起 batch size 的波动,可以帮助快速增加/减少容量。默认值为0,只能设置成非负值。
spark.streaming.backpressure.pid.minRate:可以估算的最低费率是多少。默认值为 100,只能设置成非负值

反压流程

  1. 启动服务时注册rateCotroller
    2.监听到批次结束事件后采样计算新的消费速率
    3.提交job时利用消费速率计算每个分区消费的数据条数

  2. 启动服务时注册rateCotroller
    在程序运行到StreamingContext的start方法时会调用JobScheduler的start方法,在这里会根据消费者的不同生成不同的RateController,在kafka中生成的是DirectKafkaRateController实例。接下来会把生成的RateController注册到StreamingListenerBus中。

2.监听到批次结束事件后采样计算新的消费速率
(1) StreamingListener的doPostEvent监听到批次结束事件后会调用RateController的onBatchCompleted方法,在此方法中会获取processingEnd(处理结束时间)、workDelay(处理耗时)、waitDelay(调度延迟)、elems(消息条数)着四个参数用于计算新的消费速率

(2) computedAndPublish方法会获取新的消费速率。

(3) computedAndPublish方法会调用RateEstimator接口的compute方法,现在spark支持的唯一实现类是PIDRateEstimator。compure方法返回的消费速率值会随着不断采样速率值趋向稳定。

(4)computedAndPublish方法调用compute方法获取到新的速率后把新的消费速率赋值到rateLimit属性中。

3.提交job时利用消费速率计算每个分区消费的数据条数
(1) 在提交Job时会调用DirectKafkainputDStream类的compute方法获取这一批次处理KafkaRDD。
(2) compute方法会调用clamp方法获取这一批次每个partitions要消费的截止offset(取kafka最新的offset和反压计算后的offset的最小值)。
(3) clamp方法会调用maxMessagesPerPartition方法通过消费速率计算出每个partition消费的截止offset。
(4) maxMessagesPerPartition方法调用getLatestRate,获取消费速率。

(5) 通过获取的消费速率计算每个分区消费的记录数。计算方式如下:

所有分区滞后offset总和为totalLag
某一分区滞后offset 为lagPerPartition
消费速率为 lastestRage
每个分区消费条数为 Match.round(为lagPerPartition/totalLag*lastestRage)

保证消息不丢失

// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
AssignString, String
)

stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

val results = yourCalculation(rdd)

// begin your transaction

// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly

// end your transaction
}
3.Kafka itself kafka本身提供的api自我维护
设置enable.auto.commit to false
//坑,foreachRDD 之前不能使用map orderby等生成新的rdd,这样offset信息会丢失
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

// 业务处理,异步提交
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

维护到ZK

import kafka.common.TopicAndPartition
  import kafka.message.MessageAndMetadata
  import kafka.serializer.StringDecoder
  import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
  import org.I0Itec.zkclient.ZkClient
  import org.apache.spark.streaming.{Seconds, StreamingContext}
  import org.apache.spark.streaming.dstream.InputDStream
  import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

   val conf: Conf = new config.Conf("test-util.conf")
    val zkHost = conf.getString("kafka.zookeeper.connect")
    val brokerList=conf.getString("kafka.metadata.broker.list")
    val zkClient = new ZkClient(zkHost)
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerList,
      "zookeeper.connect" -> zkHost,
      "group.id" -> "testid")


    var kafkaStream: InputDStream[(String, String)] = null
    var offsetRanges = Array[OffsetRange]()
    val sc=SparkUtil.createSparkContext("test")
    val ssc=new StreamingContext(sc,Seconds(5))
    val topic="TEST_TOPIC"
    val topicDirs = new ZKGroupTopicDirs("TEST_TOPIC_spark_streaming_testid", topic)  //创建一个 ZKGroupTopicDirs 对象,对保存


    val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")     //查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)

    var fromOffsets: Map[TopicAndPartition, Long] = Map()   //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置

    if (children > 0) {   //如果保存过 offset,这里更好的做法,还应该和  kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误
      for (i <- 0 until children) {
        val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
        val tp = TopicAndPartition(topic, i)
        fromOffsets += (tp -> partitionOffset.toLong)  //将不同 partition 对应的 offset 增加到 fromOffsets 中
      }

      val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())  //这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
    }
    else {
      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("TEST_TOPIC")) //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset
    }


    kafkaStream.transform{rdd=>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset
      rdd
    }.map(_._2).foreachRDD(rdd=>{
      for (o <- offsetRanges) {
        val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
        ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)  //将该 partition 的 offset 保存到 zookeeper
      }
      rdd.foreach(s=>println(s))
    })

    ssc.start()
    ssc.awaitTermination()



楼主实现了保存一个topic的offset到zk,但是如果Spark Streaming同时消费多个topic的方式及topicSet里有多个topic,

状态处理

UpdateStateByKey

UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例
如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量
的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指
定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数
据为(键,状态) 对。

updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对
应的(键,状态)对组成的。

updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功
能,需要做下面两步:

  1. 定义状态,状态可以是一个任意的数据类型。
  2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更
    新。

使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态。

 // 定义更新状态方法,参数 values 为当前批次单词频度,state 为以往批次单词频度
 val updateFunc = (values: Seq[Int], state: Option[Int]) => {
 val currentCount = values.foldLeft(0)(_ + _)
  val previousCount = state.getOrElse(0)
 Some(currentCount + previousCount)
 }

 val conf = new 
SparkConf().setMaster("local[*]").setAppName("NetworkWordCount")
 val ssc = new StreamingContext(conf, Seconds(3))
 ssc.checkpoint("./ck")
 // Create a DStream that will connect to hostname:port, like hadoop102:9999
 val lines = ssc.socketTextStream("linux1", 9999)
 // Split each line into words
 val words = lines.flatMap(_.split(" "))
 //import org.apache.spark.streaming.StreamingContext._ // not necessary since 
Spark 1.3
 // Count each word in each batch
 val pairs = words.map(word => (word, 1))
 // 使用 updateStateByKey 来更新状态,统计从运行开始以来单词总的次数
 val stateDstream = pairs.updateStateByKey[Int](updateFunc)
 stateDstream.print()
 ssc.start() // Start the computation
 ssc.awaitTermination() // Wait for the computation to terminate
 //ssc.stop()
 }
}
 

WindowOperations

Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许
状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
➢ 窗口时长:计算内容的时间范围;
➢ 滑动步长:隔多久触发一次计算。

注意:这两者都必须为采集周期大小的整数倍。

WordCount 第三版:3 秒一个批次,窗口 12 秒,滑步 6 秒。

object WorldCount {
 def main(args: Array[String]) {
 val conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
 val ssc = new StreamingContext(conf, Seconds(3))
 ssc.checkpoint("./ck")
 // Create a DStream that will connect to hostname:port, like localhost:9999
 val lines = ssc.socketTextStream("linux1", 9999)
 // Split each line into words
 val words = lines.flatMap(_.split(" "))
// Count each word in each batch
 val pairs = words.map(word => (word, 1))
 val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + 
b),Seconds(12), Seconds(6))
 // Print the first ten elements of each RDD generated in this DStream to the console
 wordCounts.print()
 ssc.start() // Start the computation
 ssc.awaitTermination() // Wait for the computation to terminate
 }
}

(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个
新的 Dstream;
(2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
(3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间
流元素来创建一个新的单元素流;
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)
对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数
据使用 reduce 函数来整合每个 key 的 value 值。
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函
数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。
通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例
子是随着窗口滑动对 keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”
可逆的 reduce 函数”,也就是这些 reduce 函数有相应的”反 reduce”函数(以参数 invFunc 形式
传入)。如前述函数,reduce 任务的数量通过可选参数来配置。

在这里插入图片描述
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(
{(x, y) => x + y},
{(x, y) => x - y},
Seconds(30),
Seconds(10))
//加上新进入窗口的批次中的元素 //移除离开窗口的老批次中的元素 //窗口时长// 滑动步长
countByWindow()和 countByValueAndWindow()作为对数据进行计数操作的简写。

countByWindow()返回一个表示每个窗口中元素个数的 DStream,而 countByValueAndWindow()
返回的 DStream 则包含窗口中每个值的个数。

val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30),
Seconds(10))
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))

DStream 输出
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库
或输出到屏幕上)。与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没
有被执行输出操作,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出
操作,整个 context 就都不会启动。

输出操作如下:
➢ print():在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。这
用于开发和调试。在 Python API 中,同样的操作叫 print()。
➢ saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个 DStream 的内容。每一批次的存
储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”。
➢ saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中的数据保存为
SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python
中目前不可用。
➢ saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的数据保存为 Hadoop files. 每一批次的存
储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。
➢ foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个
RDD。其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将
RDD 存入文件或者通过网络将其写入数据库。

通用的输出操作 foreachRDD(),它用来对 DStream 中的 RDD 运行任意计算。这和 transform()
有些类似,都可以让我们访问任意 RDD。在 foreachRDD()中,可以重用我们在 Spark 中实现的
所有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。

注意:

  1. 连接不能写在 driver 层面(序列化)
  2. 如果写在 foreach 则每个 RDD 中的每一条数据都创建,得不偿失;
  3. 增加 foreachPartition,在分区创建(获取)

优雅关闭

流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,但是分
布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。
使用外部文件系统来控制内部程序关闭。

MonitorStop

import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
class MonitorStop(ssc: StreamingContext) extends Runnable {
 override def run(): Unit = {
 val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new 
Configuration(), "atguigu")
 while (true) {
 try
 Thread.sleep(5000)
 catch {
 case e: InterruptedException =>
 e.printStackTrace()
 }
 val state: StreamingContextState = ssc.getState
 val bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark"))
 if (bool) {
 if (state == StreamingContextState.ACTIVE) {
 ssc.stop(stopSparkContext = true, stopGracefully = true)
 System.exit(0)
 }
 }
 }
 }
}

SparkTest

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkTest {
  def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {
  val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status: 
Option[Int]) => {
  //当前批次内容的计算
  val sum: Int = values.sum
  //取出状态信息中上一次状态
   val lastStatu: Int = status.getOrElse(0)
  Some(sum + lastStatu)
  }
  val sparkConf: SparkConf = new 
SparkConf().setMaster("local[4]").setAppName("SparkTest")
  //设置优雅的关闭
  sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
  val ssc = new StreamingContext(sparkConf, Seconds(5))
  ssc.checkpoint("./ck")
  val line: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
  val word: DStream[String] = line.flatMap(_.split(" "))
  val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
  val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
  wordAndCount.print()
  ssc
  }
  def main(args: Array[String]): Unit = {
  val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () => 
createSSC())
  new Thread(new MonitorStop(ssc)).start()
  ssc.start()
  ssc.awaitTermination()
  }
}

SparkStreaming 案例实操

<dependencies>
  <dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.12</artifactId>
  <version>3.0.0</version>
  </dependency>
  <dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>3.0.0</version>
  </dependency>
  <dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  <version>3.0.0</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
  <dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>druid</artifactId>
  <version>1.1.10</version>
  </dependency>
  <dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.27</version>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-core</artifactId>
  <version>2.10.1</version>
</dependency>
</dependencies>
import java.io.InputStreamReader
import java.util.Properties
object PropertiesUtil {
 def load(propertiesName:String): Properties ={
 val prop=new Properties()
 prop.load(new 
InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertiesName) , "UTF-8"))
 prop
 }
}

7.2 实时数据生成模块
➢ config.properties
#jdbc 配置
jdbc.datasource.size=10
jdbc.url=jdbc:mysql://linux1:3306/spark2020?useUnicode=true&characterEncoding=utf
8&rewriteBatchedStatements=true
jdbc.user=root
jdbc.password=000000

Kafka 配置

kafka.broker.list=linux1:9092,linux2:9092,linux3:909


➢ CityInfo
/**
  *
  * 城市信息表
  *
  * @param city_id 城市 id
  * @param city_name 城市名称
  * @param area 城市所在大区
  */
case class CityInfo (city_id:Long,
  city_name:String,
area:String)

➢ RandomOptions
import scala.collection.mutable.ListBuffer
import scala.util.Random
case class RanOpt[T](value: T, weight: Int)
object RandomOptions {
  def apply[T](opts: RanOpt[T]*): RandomOptions[T] = {
  val randomOptions = new RandomOptions[T]()
  for (opt <- opts) {
  randomOptions.totalWeight += opt.weight
  for (i <- 1 to opt.weight) {
  randomOptions.optsBuffer += opt.value
  }
  }
  randomOptions
  }
}
class RandomOptions[T](opts: RanOpt[T]*) {
  var totalWeight = 0
  var optsBuffer = new ListBuffer[T]
  def getRandomOpt: T = {
  val randomNum: Int = new Random().nextInt(totalWeight)
  optsBuffer(randomNum)

➢ MockerRealTime

import java.util.{Properties, Random}
import com.atguigu.bean.CityInfo
import com.atguigu.utils.{PropertiesUtil, RanOpt, RandomOptions}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
import scala.collection.mutable.ArrayBuffer
object MockerRealTime {
  /**
  * 模拟的数据
  *
  * 格式 :timestamp area city userid adid
  * 某个时间点 某个地区 某个城市 某个用户 某个广告
  */
  def generateMockData(): Array[String] = {
  val array: ArrayBuffer[String] = ArrayBuffer[String]()
  val CityRandomOpt = RandomOptions(RanOpt(CityInfo(1, "北京", "华北"), 30),
  RanOpt(CityInfo(2, "上海", "华东"), 30),
  RanOpt(CityInfo(3, "广州", "华南"), 10),
  RanOpt(CityInfo(4, "深圳", "华南"), 20),
  RanOpt(CityInfo(5, "天津", "华北"), 10))
  val random = new Random()
  // 模拟实时数据:
  // timestamp province city userid adid
  for (i <- 0 to 50) {
  val timestamp: Long = System.currentTimeMillis()
  val cityInfo: CityInfo = CityRandomOpt.getRandomOpt
  val city: String = cityInfo.city_name
  val area: String = cityInfo.area
  val adid: Int = 1 + random.nextInt(6)
  val userid: Int = 1 + random.nextInt(6)
  // 拼接实时数据
  array += timestamp + " " + area + " " + city + " " + userid + " " + adid
  }
  array.toArray
  }
  def createKafkaProducer(broker: String): KafkaProducer[String, String] = {
  // 创建配置对象
  val prop = new Properties()
  // 添加配置
  prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
  prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
  prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
  // 根据配置创建 Kafka 生产者
  new KafkaProducer[String, String](prop)
  }
 def main(args: Array[String]): Unit = {
  // 获取配置文件 config.properties 中的 Kafka 配置参数
  val config: Properties = PropertiesUtil.load("config.properties")
  val broker: String = config.getProperty("kafka.broker.list")
  val topic = "test"
  // 创建 Kafka 消费者
  val kafkaProducer: KafkaProducer[String, String] = createKafkaProducer(broker)
  while (true) {
  // 随机产生实时数据并通过 Kafka 生产者发送到 Kafka 集群中
  for (line <- generateMockData()) {
  kafkaProducer.send(new ProducerRecord[String, String](topic, line))
  println(line)
  }
  Thread.sleep(2000)
  }
  }
}

7.3 需求一:广告黑名单
实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。
注:黑名单保存到 MySQL 中。
7.3.1 思路分析
1)读取 Kafka 数据之后,并对 MySQL 中存储的黑名单数据做校验;
2)校验通过则对给用户点击广告次数累加一并存入 MySQL3)在存入 MySQL 之后对数据做校验,如果单日超过 100 次则将该用户加入黑名单。

创建库 spark2020
1)存放黑名单用户的表
CREATE TABLE black_list (userid CHAR(1) PRIMARY KEY);
2)存放单日各用户点击每个广告的次数
CREATE TABLE user_ad_count (
dt varchar(255),
userid CHAR (1),
adid CHAR (1),
count BIGINT,
PRIMARY KEY (dt, userid, adid)
);

➢ MyKafkaUtil
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, 
LocationStrategies}
object MyKafkaUtil {
  //1.创建配置信息对象
  private val properties: Properties = PropertiesUtil.load("config.properties")
  //2.用于初始化链接到集群的地址
  val broker_list: String = properties.getProperty("kafka.broker.list")
  //3.kafka 消费者配置
  val kafkaParam = Map(
  "bootstrap.servers" -> broker_list,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  //消费者组
  "group.id" -> "commerce-consumer-group",
  //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
  //可以使用这个配置,latest 自动重置偏移量为最新的偏移量
  "auto.offset.reset" -> "latest",
  //如果是 true,则这个消费者的偏移量会在后台自动提交,但是 kafka 宕机容易丢失数据
  //如果是 false,会需要手动维护 kafka 偏移量
  "enable.auto.commit" -> (true: java.lang.Boolean)
  )
  // 创建 DStream,返回接收到的输入数据
  // LocationStrategies:根据给定的主题和集群地址创建 consumer
  // LocationStrategies.PreferConsistent:持续的在所有 Executor 之间分配分区
  // ConsumerStrategies:选择如何在 Driver 和 Executor 上创建和配置 Kafka Consumer
  // ConsumerStrategies.Subscribe:订阅一系列主题
  def getKafkaStream(topic: String, ssc: StreamingContext): 
InputDStream[ConsumerRecord[String, String]] = {
  val dStream: InputDStream[ConsumerRecord[String, String]] = 
KafkaUtils.createDirectStream[String, String](ssc, 
LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, 
String](Array(topic), kafkaParam))
  dStream
  }
}
}
➢ JdbcUtil
import java.sql.{Connection, PreparedStatement, ResultSet}
import java.util.Properties
import javax.sql.DataSource
import com.alibaba.druid.pool.DruidDataSourceFactory 尚硅谷大数据技术之 S————————————————————————object JdbcUtil {
  //初始化连接池
  var dataSource: DataSource = init()
  //初始化连接池方法
  def init(): DataSource = {
  val properties = new Properties()
  val config: Properties = PropertiesUtil.load("config.prope properties.setProperty("driverClassName", "com.mysql.jdbc. properties.setProperty("url", config.getProperty("jdbc.url properties.setProperty("username", config.getProperty("jdb properties.setProperty("password", config.getProperty("jdb properties.setProperty("maxActive", 
config.getProperty("jdbc.datasource.size"))
  DruidDataSourceFactory.createDataSource(properties)
  }
  //获取 MySQL 连接
  def getConnection: Connection = {
  dataSource.getConnection
  }
  //执行 SQL 语句,单条数据插入
  def executeUpdate(connection: Connection, sql: String, param= {
  var rtn = 0
  var pstmt: PreparedStatement = null
  try {
  connection.setAutoCommit(false)
  pstmt = connection.prepareStatement(sql)
  if (params != null && params.length > 0) {
  for (i <- params.indices) {
  pstmt.setObject(i + 1, params(i))
  }
  }
  rtn = pstmt.executeUpdate()
  connection.commit()
  pstmt.close()
  } catch {
  case e: Exception => e.printStackTrace()
  }
  rtn
  }
  //执行 SQL 语句,批量数据插入
  def executeBatchUpdate(connection: Connection, sql: String, Iterable[Array[Any]]): Array[Int] = {
  var rtn: Array[Int] = null
  var pstmt: PreparedStatement = null
  try {
  connection.setAutoCommit(false)
  pstmt = connection.prepareStatement(sql)
  for (params <- paramsList) {
  if (params != null && params.length > 0) {
  for (i <- params.indices) {
  pstmt.setObject(i + 1, params(i))
  }
  pstmt.addBatch()
  }
  } 尚硅谷大数据技术之 S———————————————————————— rtn = pstmt.executeBatch()
  connection.commit()
  pstmt.close()
  } catch {
  case e: Exception => e.printStackTrace()
  }
  rtn
  }
  //判断一条数据是否存在
  def isExist(connection: Connection, sql: String, params: Arr{
  var flag: Boolean = false
  var pstmt: PreparedStatement = null
  try {
  pstmt = connection.prepareStatement(sql)
  for (i <- params.indices) {
  pstmt.setObject(i + 1, params(i))
  }
  flag = pstmt.executeQuery().next()
  pstmt.close()
  } catch {
  case e: Exception => e.printStackTrace()
  }
  flag
  }
  //获取 MySQL 的一条数据
  def getDataFromMysql(connection: Connection, sql: String, paLong = {
  var result: Long = 0L
  var pstmt: PreparedStatement = null
  try {
  pstmt = connection.prepareStatement(sql)
  for (i <- params.indices) {
  pstmt.setObject(i + 1, params(i))
  }
  val resultSet: ResultSet = pstmt.executeQuery()
  while (resultSet.next()) {
  result = resultSet.getLong(1)
  }
  resultSet.close()
  pstmt.close()
  } catch {
  case e: Exception => e.printStackTrace()
  }
  result
  }
  //主方法,用于测试上述方法
  def main(args: Array[String]): Unit = {
  }
}
➢ Ads_log
case class Ads_log(timestamp: Long,
  area: String,
  city: String,
 userid: String,
 adid: String)

BlackListHandler
import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date
import com.atguigu.bean.Ads_log
import com.atguigu.utils.JdbcUtil
import org.apache.spark.streaming.dstream.DStream
object BlackListHandler {
 //时间格式化对象
 private val sdf = new SimpleDateFormat("yyyy-MM-dd")
 def addBlackList(filterAdsLogDSteam: DStream[Ads_log]): Unit = {
 //统计当前批次中单日每个用户点击每个广告的总次数
 //1.将数据接转换结构 ads_log=>((date,user,adid),1)
 val dateUserAdToOne: DStream[((String, String, String), Long)] = 
filterAdsLogDSteam.map(adsLog => {
 //a.将时间戳转换为日期字符串
 val date: String = sdf.format(new Date(adsLog.timestamp))
 //b.返回值
 ((date, adsLog.userid, adsLog.adid), 1L)
 })
 //2.统计单日每个用户点击每个广告的总次数
((date,user,adid),1)=>((date,user,adid),count)
 val dateUserAdToCount: DStream[((String, String, String), Long)] = 
dateUserAdToOne.reduceByKey(_ + _)
 dateUserAdToCount.foreachRDD(rdd => {
 rdd.foreachPartition(iter => {
 val connection: Connection = JdbcUtil.getConnection
 iter.foreach { case ((dt, user, ad), count) =>
 JdbcUtil.executeUpdate(connection,
 """
 |INSERT INTO user_ad_count (dt,userid,adid,count)
 |VALUES (?,?,?,?)
 |ON DUPLICATE KEY
 |UPDATE count=count+?
 """.stripMargin, Array(dt, user, ad, count, count))
 val ct: Long = JdbcUtil.getDataFromMysql(connection, "select count from 
user_ad_count where dt=? and userid=? and adid =?", Array(dt, user, ad))
 if (ct >= 30) {
 JdbcUtil.executeUpdate(connection, "INSERT INTO black_list (userid) 
VALUES (?) ON DUPLICATE KEY update userid=?", Array(user, user))
 }
 }
 connection.close()
 })
 })
 }
 def filterByBlackList(adsLogDStream: DStream[Ads_log]): DStream[Ads_log] = {
 adsLogDStream.transform(rdd => {
 rdd.filter(adsLog => {
  val connection: Connection = JdbcUtil.getConnection
  val bool: Boolean = JdbcUtil.isExist(connection, "select * from black_list 
where userid=?", Array(adsLog.userid))
  connection.close()
  !bool
  })
  })
  }
}
import com.atguigu.bean.Ads_log
import com.atguigu.handler.BlackListHandler
import com.atguigu.utils.MyKafkaUtil
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
object RealTimeApp {
 def main(args: Array[String]): Unit = {
 //1.创建 SparkConf
 val sparkConf: SparkConf = new SparkConf().setAppName("RealTimeApp 
").setMaster("local[*]")
 //2.创建 StreamingContext
 val ssc = new StreamingContext(sparkConf, Seconds(3))
 //3.读取数据
 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = 
MyKafkaUtil.getKafkaStream("ads_log", ssc)
 //4.将从 Kafka 读出的数据转换为样例类对象
 val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(record => {
 val value: String = record.value()
 val arr: Array[String] = value.split(" ")
 Ads_log(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
 })
 //5.需求一:根据 MySQL 中的黑名单过滤当前数据集
 val filterAdsLogDStream: DStream[Ads_log] = 
BlackListHandler2.filterByBlackList(adsLogDStream)
 //6.需求一:将满足要求的用户写入黑名单
 BlackListHandler2.addBlackList(filterAdsLogDStream)
 //测试打印
 filterAdsLogDStream.cache()
 filterAdsLogDStream.count().print()
 //启动任务
 ssc.start()
 ssc.awaitTermination()
 }
}

广告点击量实时统计

描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入 MySQL。
7.4.1 思路分析
1)单个批次内对数据进行按照天维度的聚合统计;
2)结合 MySQL 数据跟当前批次数据更新原有的数据。

CREATE TABLE area_city_ad_count (
dt VARCHAR(255),
area VARCHAR(255),
city VARCHAR(255),
adid VARCHAR(255),
count BIGINT,
PRIMARY KEY (dt,area,city,adid)
);

import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date
import com.atguigu.bean.Ads_log
import com.atguigu.utils.JdbcUtil
import org.apache.spark.streaming.dstream.DStream
object DateAreaCityAdCountHandler {
 //时间格式化对象
 private val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
 /**
 * 统计每天各大区各个城市广告点击总数并保存至 MySQL 中
 *
 * @param filterAdsLogDStream 根据黑名单过滤后的数据集
 */
 def saveDateAreaCityAdCountToMysql(filterAdsLogDStream: DStream[Ads_log]): Unit 
= {
 //1.统计每天各大区各个城市广告点击总数
 val dateAreaCityAdToCount: DStream[((String, String, String, String), Long)] = 
filterAdsLogDStream.map(ads_log => {
 //a.取出时间戳
 val timestamp: Long = ads_log.timestamp
 //b.格式化为日期字符串
 val dt: String = sdf.format(new Date(timestamp))
 //c.组合,返回
 ((dt, ads_log.area, ads_log.city, ads_log.adid), 1L)
 }).reduceByKey(_ + _)
 //2.将单个批次统计之后的数据集合 MySQL 数据对原有的数据更新
dateAreaCityAdToCount.foreachRDD(rdd => {
 //对每个分区单独处理
 rdd.foreachPartition(iter => {
 //a.获取连接
 val connection: Connection = JdbcUtil.getConnection
 //b.写库
 iter.foreach { case ((dt, area, city, adid), count) =>
 JdbcUtil.executeUpdate(connection,
 """
 |INSERT INTO area_city_ad_count (dt,area,city,adid,count)
 |VALUES(?,?,?,?,?)
 |ON DUPLICATE KEY
 |UPDATE count=count+?;
 """.stripMargin,
 Array(dt, area, city, adid, count, count))
 }
 //c.释放连接
 connection.close()
 })
 })
 }
}

RealTimeApp

import java.sql.Connection
import com.atguigu.bean.Ads_log
import com.atguigu.handler.{BlackListHandler, DateAreaCityAdCountHandler, 
LastHourAdCountHandler}
import com.atguigu.utils.{JdbcUtil, MyKafkaUtil, PropertiesUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object RealTimeApp {
 def main(args: Array[String]): Unit = {
 //1.创建 SparkConf
 val sparkConf: SparkConf = new 
SparkConf().setMaster("local[*]").setAppName("RealTimeApp")
 //2.创建 StreamingContext
 val ssc = new StreamingContext(sparkConf, Seconds(3))
 //3.读取 Kafka 数据 1583288137305 华南 深圳 4 3
 val topic: String = 
PropertiesUtil.load("config.properties").getProperty("kafka.topic")
 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = 
MyKafkaUtil.getKafkaStream(topic, ssc)
 //4.将每一行数据转换为样例类对象
 val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(record => {
 //a.取出 value 并按照" "切分
 val arr: Array[String] = record.value().split(" ")
 //b.封装为样例类对象
  Ads_log(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
  })
  //5.根据 MySQL 中的黑名单表进行数据过滤
  val filterAdsLogDStream: DStream[Ads_log] = adsLogDStream.filter(adsLog => {
  //查询 MySQL,查看当前用户是否存在。
  val connection: Connection = JdbcUtil.getConnection
  val bool: Boolean = JdbcUtil.isExist(connection, "select * from black_list 
where userid=?", Array(adsLog.userid))
  connection.close()
  !bool
  })
  filterAdsLogDStream.cache()
  //6.对没有被加入黑名单的用户统计当前批次单日各个用户对各个广告点击的总次数,
  // 并更新至 MySQL
  // 之后查询更新之后的数据,判断是否超过 100 次。
  // 如果超过则将给用户加入黑名单
  BlackListHandler.saveBlackListToMysql(filterAdsLogDStream)
  //7.统计每天各大区各个城市广告点击总数并保存至 MySQL 中
dateAreaCityAdCountHandler.saveDateAreaCityAdCountToMysql(filterAdsLogDStream)
  //10.开启任务
  ssc.start()
  ssc.awaitTermination()
  }
}

最近一小时广告点击量

结果展示:
1:List [15:50->10,15:51->25,15:52->30]
2:List [15:50->10,15:51->25,15:52->30]
3:List [15:50->10,15:51->25,15:52->30]
7.5.1 思路分析
1)开窗确定时间范围;
2)在窗口内将数据转换数据结构为((adid,hm),count);
3)按照广告 id 进行分组处理,组内按照时分排序。

import java.sql.Connection
import com.atguigu.bean.Ads_log
import com.atguigu.handler.{BlackListHandler, DateAreaCityAdCountHandler, 
LastHourAdCountHandler}
import com.atguigu.utils.{JdbcUtil, MyKafkaUtil, PropertiesUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object RealTimeApp {
 def main(args: Array[String]): Unit = {
 //1.创建 SparkConf
 val sparkConf: SparkConf = new 
SparkConf().setMaster("local[*]").setAppName("RealTimeApp")
 //2.创建 StreamingContext
 val ssc = new StreamingContext(sparkConf, Seconds(3))
 //3.读取 Kafka 数据 1583288137305 华南 深圳 4 3
 val topic: String = 
PropertiesUtil.load("config.properties").getProperty("kafka.topic")
 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = 
MyKafkaUtil.getKafkaStream(topic, ssc)
 //4.将每一行数据转换为样例类对象
 val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(record => {
 //a.取出 value 并按照" "切分
 val arr: Array[String] = record.value().split(" ")
 //b.封装为样例类对象
 Ads_log(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
 })
 //5.根据 MySQL 中的黑名单表进行数据过滤
 val filterAdsLogDStream: DStream[Ads_log] = adsLogDStream.filter(adsLog => {
 //查询 MySQL,查看当前用户是否存在。
 val connection: Connection = JdbcUtil.getConnection
 val bool: Boolean = JdbcUtil.isExist(connection, "select * from black_list 
where userid=?", Array(adsLog.userid))
 connection.close()
 !bool
 })
 filterAdsLogDStream.cache()
 //6.对没有被加入黑名单的用户统计当前批次单日各个用户对各个广告点击的总次数,
 // 并更新至 MySQL
 // 之后查询更新之后的数据,判断是否超过 100 次。
 // 如果超过则将给用户加入黑名单
 BlackListHandler.saveBlackListToMysql(filterAdsLogDStream)
 //7.统计每天各大区各个城市广告点击总数并保存至 MySQL 中
 DateAreaCityAdCountHandler.saveDateAreaCityAdCountToMysql(filterAdsLogDStream)
 //8.统计最近一小时(2 分钟)广告分时点击总数
 val adToHmCountListDStream: DStream[(String, List[(String, Long)])] = 
LastHourAdCountHandler.getAdHourMintToCount(filterAdsLogDStream)
 //9.打印
 adToHmCountListDStream.print()
 //10.开启任务
 ssc.start()
 ssc.awaitTermination()
 }
}
 
import java.text.SimpleDateFormat
import java.util.Date
import com.atguigu.bean.Ads_log
import org.apache.spark.streaming.Minutes
import org.apache.spark.streaming.dstream.DStream
object LastHourAdCountHandler {

 //时间格式化对象
  private val sdf: SimpleDateFormat = new SimpleDateFormat("HH:mm")
  /**
  * 统计最近一小时(2 分钟)广告分时点击总数
  *
  * @param filterAdsLogDStream 过滤后的数据集
  * @return
  */
  def getAdHourMintToCount(filterAdsLogDStream: DStream[Ads_log]): 
DStream[(String, List[(String, Long)])] = {
  //1.开窗 => 时间间隔为 1 个小时 window()
  val windowAdsLogDStream: DStream[Ads_log] = 
filterAdsLogDStream.window(Minutes(2))
  //2.转换数据结构 ads_log =>((adid,hm),1L) map()
  val adHmToOneDStream: DStream[((String, String), Long)] = 
windowAdsLogDStream.map(adsLog => {
  val timestamp: Long = adsLog.timestamp
  val hm: String = sdf.format(new Date(timestamp))
  ((adsLog.adid, hm), 1L)
  })
  //3.统计总数 ((adid,hm),1L)=>((adid,hm),sum) reduceBykey(_+_)
  val adHmToCountDStream: DStream[((String, String), Long)] = 
adHmToOneDStream.reduceByKey(_ + _)
  //4.转换数据结构 ((adid,hm),sum)=>(adid,(hm,sum)) map()
  val adToHmCountDStream: DStream[(String, (String, Long))] = 
adHmToCountDStream.map { case ((adid, hm), count) =>
  (adid, (hm, count))
  }
  //5.按照 adid 分组 (adid,(hm,sum))=>(adid,Iter[(hm,sum),...]) groupByKey
  adToHmCountDStream.groupByKey()
  .mapValues(iter =>
  iter.toList.sortWith(_._1 < _._1)
  )
  }
 }
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐