STORM整体分析

一、概念

  • Storm是个实时的、分布式以及具备高容错的计算系统
    • ​ Storm进程常驻内存
    • ​ Storm数据不经过磁盘,在内存中处理
  • Twitter开源的分布式实时大数据处理框架,最早开源于github

1、构成

  • Nimbus
  • Supervisor
  • Worker

2、编程模型

  • DAG(Topology)
  • SpBolt
  • out

3、序列化

  • **K r y o**序列化,高效,数据量小

4、数据传输

Z M Q(twitter早期产品)

Z e r o M Q开源的消息传递框架,并不是一个MessageQueue

N e t t y是基于NIO的网络框架,更加高效。(之所以Storm 0.9版本之后使用Netty,是因为ZMQ的license和Storm的license不兼容。)

5、storm与mapReduce对比

  • Storm : 进程、线程常驻内存运行,数据不进入磁盘,数据通过网络传递。

  • MapReduce: 为TB、PB级别数据设计的批处理计算框架。

    image-20190825194708831.png

6、Storm与Spark Streaming对比

  • Storm: 纯流式处理

    • 专门为流式处理设计
    • 数据传输模式更为简单,很多地方也更为高效
    • 并不是不能做批处理,它也可以来做微批处理,来提高吞吐
  • Spark Streaming:微批处理

    • 将RDD做的很小来用小的批处理来接近流式处理

    • 基于内存和DAG可以把处理任务做的很快

      image-20190825194954194.png

二、strom计算模型

1、Topology – DAG有向无环图的实现


Topology: 是对于Storm实时计算逻辑的封装,它由一系列通过数据流相互关联的Spout、Bolt所组成的拓扑结构

  • 生命周期: 此拓扑只要启动就会一直在集群中运行,直到手动将其kill,否则不会终止 (区别于MapReduce当中的Job,MR当中的Job在计算执行完成就会终止Stream – 数据流。


2、Stream – 数据流

Spout中源源不断传递数据给Bolt、以及上一个Bolt传递数据给下一个Bolt,所形成的这些数据通道即叫做Stream

Stream声明时需给其指定一个Id**(默认为Default)**

实际开发场景中,多使用单一数据流,此时不需要单独指定StreamId

[外链图片转存失败(img-C1Jjxcfp-1566826226800)(https://i.loli.net/2019/08/25/BULPDm58qe36aRA.png)]


3、Tuple – 元组

​ Stream中最小数据组成单元image-20190825202121950.png


4、Spout – 数据源

  • 拓扑中数据流的来源。一般会从指定外部的数据源读取元组(Tuple)发送到拓扑(Topology)中

  • 一个Spout可以发送多个数据流(Stream

  • 可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去

  • Spout中最核心的方法是nextTuple,该方法会被Storm线程不断调用、主动从数据源拉取数据,再通过emit方法将数据生成元组(Spout – 数据源

  • 拓扑中数据流的来源。一般会从指定外部的数据源读取元组(Tuple)发送到拓扑(Topology)中

  • 一个Spout可以发送多个数据流(Stream)可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流**Id(streamId)**参数将数据发送出去

  • Spout中最核心的方法是nextTuple,该方法会被Storm线程不断调用、主动从数据源拉取数据,再通过emit方法将数据生成元组(Tuple)发送给之后的Bolt计算


5、Bolt – 数据流处理组件

  • 拓扑中数据处理均有Bolt完成。对于简单的任务或者数据流转换,单个Bolt可以简单实现;更加复杂场景往往需要多个Bolt分多个步骤完成
  • 一个Bolt可以发送多个数据流(Stream)
  • 可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去
  • Bolt中最核心的方法是execute方法,该方法负责接收到一个元组(Tuple)数据、真正实现核心的业务逻辑

6、Stream Grouping – 数据流分组(即数据分发策略)

  • Shuffle grouping(随机分组):
    • 这种方式会随机分发tuple给bolt的各个task,每个bolt实例接收到的相同数量的tuple。
  • Fields grouping(按字段分组):
    • 根据指定字段的值进行分组。比如说,一个数据流根据“word”字段进行分组,所有具有相同“word”字段值的tuple会路由到同一个bolt的task中。
  • All grouping(全复制分组):
    • 将所有的tuple复制后分发给所有bolt task。每个订阅数据流的task都会接收到tuple的拷贝。
  • Globle grouping(全局分组):
    • 这种分组方式将所有的tuples路由到唯一一个task上。Storm按照最小的task ID来选取接收数据的task。
  • None grouping(不分组):
    • 在功能上和随机分组相同,是为将来预留的。
  • Direct grouping(指向型分组):
    • 数据源会调用emitDirect()方法来判断一个tuple应该由哪个Storm组件来接收。只能在声明了是指向型的数据流上使用。
  • Local or shuffle grouping(本地或随机分组):
    • 和随机分组类似,但是,会将tuple分发给同一个worker内的bolt task(如果worker内有接收数据的bolt task)。其他情况下,采用随机分组的方式。取决于topology的并发度,本地或随机分组可以减少网络传输,从而提高topology性能。

三、Storm整体架构

image-20190825203830559.png


Nimbus

  1. 资源调度
  2. 任务分配
  3. 接收jar包

Supervisor

  1. Supervisor
  2. 接收nimbus分配的任务
  3. 启动、停止自己管理的worker进程(当前supervisor上worker数量由配置文件设定)

Worker

  1. 、运行具体处理运算组件的进程(每个Worker对应执行一个Topology的子集)

  2. worker任务类型,即spout任务、bolt任务两种

  3. 启动executor

    ​ (executor即worker JVM进程中的一个java线程,一般默认每个executor负责执行一个task任务)


Zookeeper


hadoop与storm架构对比

HadoopStorm
主节点ResourceManagerNimbus
从节点NodeManagerSupervisor
应用程序JobTopology
工作进程ChildWorker
计算模型Map/Reduce(split,map,shuffle,reduce)Spout/Bolt

storm作业提交流程

[外链图片转存失败(img-6FQYxr9R-1566826226802)(https://i.loli.net/2019/08/25/NGAdamMChoB2szH.png)]

image-20190825210309875.png

image-20190825210323343.png

四、Storm容错机制

1、集群节点宕机

Nimbus服务器

单点故障**?**

非Nimbus服务器

故障时,该节点上所有Task任务都会超时,Nimbus会将这些Task任务重新分配到其他服务器上运行

2、进程挂掉

Worker

挂掉时,Supervisor会重新启动这个进程。如果启动过程中仍然一直失败,并且无法向Nimbus发送心跳,Nimbus会将该Worker重新分配到其他服务器上

Supervisor

无状态(所有的状态信息都存放在Zookeeper中来管理)

快速失败(每当遇到任何异常情况,都会自动毁灭)

Nimbus

无状态(所有的状态信息都存放在Zookeeper中来管理)

快速失败(每当遇到任何异常情况,都会自动毁灭)

3、消息的完整性

**概念:**从Spout中发出的Tuple,以及基于他所产生Tuple(例如上个例子当中Spout发出的句子,以及句子当中单词的tuple等)由这些消息就构成了一棵tuple树 ,当这棵tuple树发送完成,并且树当中每一条消息都被正确处理,就表明spout发送消息被“完整处理”,即消息的完整性

Storm提供了几种不同级别的保证消息处理,包括尽力而为(best effort)至少一次(at least once),以及通过Trident保证只完全处理一次(exactly once)

此处指的是至少完全处理一次。

image-20190825211351226.png

当元组树已经用完并且树中的每条消息都已处理完毕时,Storm会认为从水龙头发射的元组是“完全处理”的。如果在指定的超时内无法完全处理其消息树,则认为该元组失败。可以使用Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS对指定的拓扑进行配置,默认为30秒。

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

首先,storm调用spout的nextTuple方法发射一个元组。spout使用SpoutOutputCollector在声明的流中发射元组。当发射元组的时候,spout使用messageId标记该元组。

_collector.emit(new Values("field1", "field2", 3) , msgId);

其次,当向消费闪电发送元组的时候,strom追踪该消息树。如果storm发现一个元组被完全处理了,storm就会调用spout的ack方法并将该元组的messageId传送给它。如果元组处理超时,storm就调用spout的fail方法。只会在发射该元组的spout中调用它的ack或者fail方法。

好处是:

首先,当创建了新的元组树边的时候,要通知storm。其次,当完成了一个元组的处理之后要告诉storm。通过这种方式,storm就可以知道元组被完全处理了,然后调用ack方法,或者调用fail方法,如果处理失败。

在元组树中创建一条边,称为锚点。当发送一个新元组的时候就会创建一条边。

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }

            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

每个元组通过在emit中指定输入的元组进行锚点标记。由于被标记锚点,如果在下游处理中处理失败了,元组树顶点的元组会重新发送。

_collector.emit(new Values(word));

上述方式没有对元组进行锚点标记。当元组在下游处理失败了,根元组不会重发。输出的元组也可以标记多个锚点。如果流中有聚合或者join,就比较有用。标记了多个锚点的元组如果处理失败,就会触发多个元组树根元组重发。通过list集合为元组标记多个锚点:

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

很明显,手动调用fail方法比通过元组的处理超时从而由storm调用fail方法要更快地重发根元组。由于storm使用内存来存储元组树,如果不及时的ack或者fail有可能导致内存溢出。

storm的拓扑中提供了一组acker用于追踪spout发射的每个元组及其衍生的元组,一旦发现DAG处理完了,就同创建该元组的spout进行确认。

Config.TOPOLOGY_ACKERS用于设置acker的数量。默认情况下一个worker一个acker任务。

当拓扑创建了元组,就会为其分配一个随机的64bit的id,acker使用该ID追踪spout发送的每个元组。元组树上的元组都知道这个ID。当在闪电中创建了新的元组,该ids会拷贝给新的元组。当元组确认后,元素发送消息给acker任务,以改变元组树。

image-20190825211841070.png

当通过C衍生出D和E并且确认之后,就会从元组树中移除C。这样可以保证元组树不会过早地完成。

如果拓扑中包含多个acker,当一个元组确认后,如何知道向哪个acker发送确认消息?

storm使用hash取模的方式将一个spout的元组id跟一个acker任务绑定。

acker任务如何知道该向哪个spout任务发送确认消息?spout发射元组的时候会给合适的acker发送一个消息表示对哪个spout的元组负责。acker发现元组树完成了,就知道向哪个spout任务发送完成的消息。

acker不显式地追踪元组。如果有数十万的节点,追踪所有的元组会有耗尽acker内存的风险。acker使用一个定长的空间20字节做这个工作。这个是storm的主要创新。

acker将spout发射的元组id和一个64bit的数字(ack val)相关联。ack val代表了该元组树的状态,不管spout发射的元组及其衍生的元组有多少,它仅仅是对所有创建的元组以及确认的元组id求异或xor操作。如果最终这个值ack val变成0,表示元组树已经被完全处理。某个元组的id和该64bit的数字异或结果是0的情况极其少见,比如每秒处理10k的元组,需要5000万年才会产生一个错误,造成数据丢失。

异常情况

1、 由于任务死掉,没有确认元组:元组树根节点的元组和丢掉的元组确认会超时,重新发送根元组。

2、 acker死掉:所有的元组确认都会超时,根节点元组重发

3、 spout死掉:spout获取数据的数据源负责重新发送消息。例如:MQ等会将所有打开的消息放回到队列中,之后重新处理。

由此可见,strom的可靠性机制完全是分布式的,可扩展的以及容错的。


Acker – 消息完整性的实现机制

Storm的拓扑当中特殊的一些任务

负责跟踪每个Spout发出的Tuple的DAG(有向无环图)

默认每个worker一个acker

4、DRPC

概念

分布式RPC(DRPC)背后的想法是使用Storm在运行中并行计算真正强大的函数。Storm拓扑接收函数参数流作为输入,并为每个函数调用发出结果的输出流。

DRPC并不是Storm的一个特征,因为它是Storm的原始流,spouts,bolts和拓扑表示的模式。DRPC本可以打包成Storm独立的库,但是它与Storm捆绑在一起非常有用。

  • 分布式远程过程调用DRPC (Distributed RPC) remote procedure call
  • DRPC是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。
  • DRPC Server 负责接收RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。
  • (其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)
DRPC设计目的:

为了充分利用Storm的计算能力实现高密度的并行实时计算。

(Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)

处理过程

客户端通过向DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。

image-20190825210755427.png

DRPC实现方式
定义DRPC拓扑:
方法1:

通过LinearDRPCTopologyBuilder(该方法也过期,不建议使用)

该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现

image-20190825210950663.png

方法2:

直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑

需要手动设定好开始的DRPCSpout以及结束的ReturnResults

image-20190825211022771.png

运行模式:
1、模式本地

image-20190825211102637.png

2、远程模式(集群模式)

修改配置文件conf/storm.yaml

drpc.servers:

​    \- "node1“

启动DRPC Server

bin/stormdrpc&

通过StormSubmitter.submitTopology提交拓扑
LinearDRPCTopologyBuilder

Storm附带了一个名为LinearDRPCTopologyBuilder的拓扑构建器,它可以自动执行几乎所有涉及DRPC的步骤。 这些包括:

​ 1、设置spout

​ 2、将结果返回给DRPC服务器

​ 3、为bolt提供功能,以便在tuple(元组)组上进行有限聚合

Eg:

public static class ExclaimBolt extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String input = tuple.getString(1);
        collector.emit(new Values(tuple.getValue(0), input + "!"));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "result"));
    }
}

public static void main(String[] args) throws Exception {
    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
    builder.addBolt(new ExclaimBolt(), 3);
    // ...
}

创建LinearDRPCTopologyBuilder时,可以告诉它拓扑的DRPC函数的名称。 单个DRPC服务器可以协调许多功能,函数名称可以区分各个函数。 声明的第一个bolt将2元组作为输入,其中第一个字段是请求ID,第二个字段是该请求的参数。LinearDRPCTopologyBuilder期望最后一个bolt发出一个输出流,其中包含[id,result]形式的2元组。 最后,所有中间元组都必须包含请求ID作为第一个字段。

在这个例子中,ExclaimBolt只是附加一个“!” 到元组的第二个字段。LinearDRPCTopologyBuilder处理连接到DRPC服务器并返回结果的其余协调。

本地模式的DRPC

DRPC可以在本地模式运行。下面的例子说明了如何运行本地模式的DRPC:

LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();

cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));

System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));

cluster.shutdown();
drpc.shutdown();

首先,创建一个LocalDRPC对象。 此对象模拟正在进行的DRPC服务器,就像LocalCluster在进程中模拟Storm集群一样。 然后创建LocalCluster以在本地模式下运行拓扑。LinearDRPCTopologyBuilder具有用于创建本地拓扑和远程拓扑的单独方法。 在本地模式下,LocalDRPC对象不会绑定到任何端口,因此拓扑需要知道要与之通信的对象。 这就是createLocalTopology将LocalDRPC对象作为输入接收的原因。

启动拓扑后,您可以使用LocalDRPC上的execute方法执行DRPC调用。

远程模式的DRPC

在实际集群上使用DRPC也很简单。 有三个步骤:

​ 1、启动DRPC服务器

​ 2、配置DRPC服务器的位置

​ 3、将DRPC拓扑提交给Storm集群

启动DRPC服务器可以使用storm脚本完成,就像启动Nimbus或UI一样:

bin/storm drpc

接下来,您需要配置Storm群集以了解DRPC服务器的位置。 这就是DRPCSpout如何知道从何处读取函数调用。 这可以通过storm.yaml文件或拓扑配置来完成。 通过storm.yaml配置这个看起来像这样:

drpc.servers:
  - "drpc1.foo.com"
  - "drpc2.foo.com"

最后,像启动任何一个其他的拓扑一样,使用StormSubmitter启动DRPC拓扑。要在远程模式运行上述的示例,操作如下:

StormSubmitter
.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

createRemoteTopology用于为storm集群创建合适的拓扑。

稍微复杂的示例

感叹号DRPC示例是用于说明DRPC概念的玩具示例。让我们看一个更复杂的例子,它真正需要Storm集群为计算DRPC函数提供的并行性。我们将看到的示例是在Twitter上计算URL的范围。

URL的范围是在Twitter上暴露给URL的唯一人数。要计算覆盖面,您需要:

​ 1、获取推文网址的所有人

​ 2、获得所有这些人的所有粉丝

​ 3、独特的追随者

​ 4、统计一组独特的粉丝

在计算过程中,单个到达计算可能涉及数千个数据库调用和数千万个跟随者记录。这是一个非常非常密集的计算。正如您将要看到的那样,在Storm之上实现此功能非常简单。在一台计算机上,达到计算可能需要几分钟;在Storm集群中,您可以在几秒钟内计算最难的URL的覆盖率。

此处的storm-starter中定义了样本范围拓扑。以下是定义范围拓扑的方法:

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12)
        .shuffleGrouping();
builder.addBolt(new PartialUniquer(), 6)
        .fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2)
        .fieldsGrouping(new Fields("id"));

拓扑执行为四个步骤:

  1. GetTweeters获取推文URL的用户。它将[id,url]的输入流转换为[id,tweeter]的输出流。每个url元组将映射到许多tweeter元组。
  2. **GetFollowers获得推特的追随者。**它将[id,tweeter]的输入流转换为[id,follower]的输出流。在所有任务中,当有人跟随多个发布相同URL的人时,可能会有重复的跟随元组。
  3. **PartialUniquer通过关注者ID对关注者流进行分组。**这具有相同的跟随者执行相同任务的效果。因此,PartialUniquer的每项任务都将获得相互独立的追随者。一旦PartialUniquer收到针对请求ID的所有针对它的关注元组,它就会发出其关注者子集的唯一计数。
  4. 最后,CountAggregator接收来自每个PartialUniquer任务的部分计数,并将它们相加以完成到达计算。
PartialUniquer代码:
public class PartialUniquer extends BaseBatchBolt {
    BatchOutputCollector _collector;
    Object _id;
    Set<String> _followers = new HashSet<String>();

    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
        _collector = collector;
        _id = id;
    }

    @Override
    public void execute(Tuple tuple) {
        _followers.add(tuple.getString(1));
    }

    @Override
    public void finishBatch() {
        _collector.emit(new Values(_id, _followers.size()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "partial-count"));
    }
}

PartialUniquer通过扩展BaseBatchBolt实现IBatchBolt。批处理bolt提供了一个第一类API来处理一批元组作为具体单元。为每个请求ID创建一个新的批处理bolt实例,Storm会在适当的时候负责清理实例。

PartialUniquerexecute方法中收到一个跟随元组时,它会将它添加到内部HashSet中的请求ID的集合中。

批处理bolt提供finishBatch方法,该方法在处理了针对此任务的此批处理的所有元组之后调用。在回调中,PartialUniquer会发出一个元组,其中包含其跟随者id子集的唯一计数。

在底层,CoordinatedBolt用于检测给定的bolt何时收到任何给定请求ID的所有元组。CoordinatedBolt利用直接流来管理这种协调。

拓扑的其余部分应该是不言自明的。如您所见,到达计算的每一步都是并行完成的,定义DRPC拓扑非常简单。

非线性DRPC拓扑

LinearDRPCTopologyBuilder仅处理“线性”DRPC拓扑,其中计算表示为一系列步骤(如覆盖范围)。 不难想象函数需要更复杂的拓扑结构,包括bolt的分支和合并。 现在,要做到这一点,你需要直接使用CoordinatedBolt。 请务必在邮件列表中讨论非线性DRPC拓扑的用例,以便为DRPC拓扑构建更一般的抽象。

LinearDRPCTopologyBuilder工作流程:

DRPCSpout发射[args, return-info]。return-info是DRPC服务器的主机名和端口号,以及DRPC服务器生成的id。

创建一个拓扑包括:

1、 DRPCSpout

2、 PrepareRequest(生成请求ID,为返回信息创建一个流,为参数创建一个流)

3、 CoordinatedBolt

4、 JoinResult(使用return info合并结果)

5、 ReturnResult(连接DRPC服务器以及返回结果)

LinearDRPCTopologyBuilder是在storm原语之上构建高级别抽象的一个很好的例子。

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐