2021SC@SDUSC

螺栓源码解析(三)

2021SC@SDUSC

本文主要介绍螺栓接口

Storm 中定义的 Bolt 接口主要有 IBolt、IRichBolt、IBasicBolt 和 IBatchBolt

关系如下:

IBolt.java

IBolt定义了Bolt的函数集,其代码如下:

公共接口 IBolt 扩展了可序列化 {

void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector 收集器);

无效执行(元组输入);

无效清理();

}

螺栓是 Storm 中的基本运行单位。当它启动并有消息输入时,会调用execute方法进行处理。与 ISpout 类似,IBolt 对象在提交时也会被序列化为字节数组。具体执行节点通过反序列化方法获取对象,并调用prepare回调方法。 c用户应在prepare回调方法中实现复杂对象的初始化,以保证每个具体对象都能正确初始化。

当对象被销毁时,会调用清理回调方法,但 Storm 不保证该方法会被执行。

通常,输入的消息会在execute方法的实现中进行处理,可能会生成一条新的消息,需要发送给下游节点。最后,确认输入消息。如果消息处理失败,需要对输入的消息进行Fail,这是保证Ack消息系统正常运行的基础。

IRichBolt.java

包 org.apache.storm.topology;

导入 org.apache.storm.task.IBolt;

公共接口 IRichBolt 扩展 IBolt,IComponent {

}

IRichBolt 需要同时实现 IComponent 和 IBolt 接口,即具有 Bolt 功能的组件。在实际使用中,IRichBolt 是实现 Topology 组件的主要接口。

IBasicBolt.java

公共接口 IBasicBolt 扩展 IComponent {

void prepare(Map<String, Object> topoConf, TopologyContext context);

无效执行(元组输入,BasicOutputCollector 收集器);

无效清理();

}

IBasicBolt 接口的定义与 IBolt 基本相同,具体实现要求与 IBolt 相同。 IBasicBolt 接口与 IBolt 的区别在于以下两点:

1\。它的输出收集器使用basicooutputcollector,参数放在execute方法中,而不是prepare。

2\。它实现了 IComponent 接口,这表明它可以用来定义拓扑组件。

该接口存在的原因:

ibasic Bolt 的主要功能是为用户提供更简单的 Bolt

写作方法。基于 ibasic Bolt 编写的好处是 Storm 框架本身可以帮你处理发送消息的 Ack、Fail 和 Anchor 操作,由执行器 BasicBoltExecutor 实现。

BasicBoltExecutor 实现了 IRichBolt 接口,还包含一个用于转发调用的 IBasciBolt 成员变量。它是基于装饰模式实现的,其定义如下:

公共类 BasicBoltExecutor 实现 IRichBolt {

公共静态最终 Logger LOG u003d LoggerFactory.getLogger(BasicBoltExecutor.class);

专用 IBasicBolt 螺栓;

私有瞬态 BasicOutputCollector 收集器;

公共基本螺栓执行器(IBasicBolt 螺栓){

this.bolt u003d 螺栓;

}

公共无效 declareOutputFields(OutputFieldsDeclarer 声明者){

bolt.declareOutputFields(declarer);

}

@覆盖

public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector 收集器) {

bolt.prepare(topoConf, 上下文);

this.collector u003d new Basic OutputCollector(collector);

}

公共无效执行(元组输入){

收集器.setContext(输入);

试试 {

bolt.execute(输入,收集器);

收集器.getOutputter().ack(输入);

} 捕捉(失败异常 e){

if (e instanceof ReportedFailedException) {

收集器.reportError(e);

}

收集器.getOutputter().fail(输入);

}

}

公共无效清理(){

螺栓清理();

}

公共地图<字符串,对象> getComponentConfiguration() {

返回 bolt.getComponent 配置();

}

}

BasicBoltExecutor 需要实现 IRichBolt 接口的原因:

用户实现IBasicBolt接口的Bolt对象后,Storm在构建Topology时会调用TopologyBuilder的setbolt方法设置Bolt对象。 setbolt方法用BasicBoltExecutor封装了用户的实现类,Storm自动为用户实现,同时调用了可以接收IRichBolt参数的重载方法完成Bolt设置。

IBatchBolt.java

公共接口 IBatchBolt<T> 扩展可序列化,IComponent {

void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector 收集器, T id);

无效执行(元组元组);

无效完成批次();

}

与IBasicBolt接口不同,IBatchBolt主要用于storm中的批处理。目前storm主要使用这个接口来实现消息的可靠传输。在这种情况下,批处理将比单个消息处理更有效。 Storm 的事务 Topology 和 Trident 主要基于 IBatchBolt。

与之前的 IBolt、IBasicBolt 和 IRichBolt 相比,IBatchBolt 多了一个 finishBatch 方法,在批次结束时调用。

制备方法:

用于初始化批处理。在prepare方法中,最后一个参数是泛型类型T,可以作为Batch的唯一标识。在从 IBatchBolt 派生的 BaseTransactionalBolt 中,t 将被实例化为 transactionalatemp。

在当前的 Storm 实现中,每个事务都会对应一个 Batch,每个 Batch 的数据都会被一个新的 IBatchBolt 对象处理。因此,在prepare方法中,需要传入一个用于标识Batch的变量T。在事务拓扑中,Storm 使用 TransactionAttempt 作为标识符。一个 Batch 处理成功后,该 Batch 对应的 IBatchBolt 对象将被销毁。因此,用户无法通过 IBatchBolt 对象本身保存要在多个批次之间共享的数据。

执行方法:

用于处理属于此 Batch 的消息。

完成批处理方法:

只有在处理完这批消息时才会调用该方法。如果同时实现 BatchBolt

使用 iconitter 接口,只有在成功处理完 Batch 之前的所有批次后,才会调用 finishBatch 方法。这既保证了强顺序关系,也保证了Storm中事务拓扑的实现基础。

Logo

华为、百度、京东云现已入驻,来创建你的专属开发者社区吧!

更多推荐