手把手教学storm分布式搭建,及java代码对集群的操作,其实流处理也很简单!!!!!
storm完全分布式搭建1、配置JDK2、配置zookeeperhttps://blog.csdn.net/lpf787887/article/details/893232273、将storm解压到/opttar -zxf zookeeper-3.4.6.tar.gz -C /opt/4、配置:vi /opt/apache-storm-0.10.0/conf/storm.yaml...
·
storm完全分布式搭建
1、配置JDK
2、配置zookeeper
https://blog.csdn.net/lpf787887/article/details/89323227
3、将storm解压到/opt
tar -zxf zookeeper-3.4.6.tar.gz -C /opt/
4、配置:
vi /opt/apache-storm-0.10.0/conf/storm.yaml
#配置zookeeper的地址
storm.zookeeper.servers:
- "node01"
- "node02"
- "node03"
# nimbus的位置
nimbus.host: "node01"
# 指定每个supervisor上worker的通信端口
# 有几个端口就有几个worker
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
5、在storm的目录下创建logs文件夹记录日志信息
mkdir logs
6、在node02和node03上分发storm包
scp -r apache-storm-0.10.0/ node02:`pwd`
7、将storm的bin添加到PATH中
export STORM_HOME=/opt/apache-storm-0.10.0
export PATH=$PATH:$STORM_HOME/bin
8、启动zookeeper集群
zkServer.sh start 启动
zkServer.sh stop 停止
zkServer.sh status 查看zk的状态
9、启动storm的进程
启动nimbus,将标准输出重定向到当前目录的logs中的nimbus.out
错误输出也重定向到该文件,最后的&表示后台运行
node01:
nimbus
storm nimbus >> logs/nimbus.out 2>&1 &
supervisor
storm supervisor >> logs/supervisor.out 2>&1 &
ui
storm ui >> logs/ui.out 2>&1 &
node02:
supervisor
storm supervisor >> logs/supervisor.out 2>&1 &a
node03:
supervisor
storm supervisor >> logs/supervisor.out 2>&1 &
10、关闭
killall java
附上代码
需要的jar包在apache-storm-0.10.0\apache-storm-0.10.0\lib下
main方法入口
package com.huawei.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
* @author Lpf.
* @version 创建时间:2019年4月21日 下午4:32:35
*/
public class MainClass {
public static void main(String[] args)
throws InterruptedException, AlreadyAliveException, InvalidTopologyException, AuthorizationException {
// 拓扑构造器
TopologyBuilder builder = new TopologyBuilder();
/**
* 设置水龙头spout args[1] spout的名字 args[2] 实现的spout的类
*/
builder.setSpout("sentence_sport", new SentenceSpout());
/**
* 设置闪电bolt args[1] bolt的名字 args[2] 实现的bolt的类
*/
// 以随机分组的形式接收sentence_spout发送过来的元组
builder.setBolt("split_blot", new SplitBlot()).shuffleGrouping("sentence_sport");
// 相同word值的元组分发给同一个bolt来处理,将word的值hash取模搞定
builder.setBolt("count_blot", new CountBlot()).fieldsGrouping("split_blot", new Fields("word"));
builder.setBolt("report_blot", new ReportBlot()).globalGrouping("count_blot");
// // 本地模拟storm集群
// LocalCluster cluster = new LocalCluster();
// // 用于配置信息
// Config conf = new Config();
// // 启用debug模式
// conf.setDebug(true);
// /**
// * 向本地集群提交拓扑 args[1] 提交作业的名字 args[2] 加载的一些配置 args[3] 拓扑流程
// */
// cluster.submitTopology("wordcount", conf, builder.createTopology());
//
// Thread.sleep(3000);
// // 关闭本地集群
// cluster.shutdown();
// 在集群提交拓扑
// 用于配置信息
Config conf = new Config();
StormSubmitter.submitTopology("wordCount", conf, builder.createTopology());
}
}
SentenceSpout模拟线上环境读取数据信息
package com.huawei.storm;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/**
* @author Lpf.
* @version 创建时间:2019年4月21日 下午4:38:42
*/
public class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private int index = 0;
private String[] sentences = { "Apache Storm is a free doing for realtime processing what Hadoop did ",
"processing. Storm is simple, can be used with",
"processed per second per node. It is scalable, fault-tolerant",
"the streams between each stage of the computation however needed",
"Storm users should send messages and subscribe to user@storm.apache.org.",
"You can also browse the archives of the storm-user mailing list.",
"You can also browse the archives of the storm-dev mailing list.",
"You can view the archives of the mailing list here.", };
// 当集群初始化水龙头spout后,调用该方法
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
// 每次调用该方法就从数据源读取一个句子然后发射出去
public void nextTuple() {
// 发送当前元组
// this.collector.emit(tuple);
// 发射指定的元组,同时给该元组设置一个消息ID,当该元组在下个bolt中处理失败,则该spout重发
// this.collector.emit(tuple, messageId);
// 可以指定使用哪个流来发射该元组,streamId就是在declareOutputFields声明的流名称
// this.collector.emit(streamId, tuple);
// this.collector.emit(streamId, tuple, messageId);
// 将当前元祖发送给哪个任务,可以通过taskId来指定,直接分组
// this.collector.emitDirect(taskId, tuple);
// 还可以保证消息处理的完整性 messageId
// this.collector.emitDirect(taskId, tuple, messageId);
// 指定流来发送当前元组,直接分组策略
// this.collector.emitDirect(taskId, streamId, tuple);
// 可以保证消息完整性 messageId
// this.collector.emitDirect(taskId, streamId, tuple, messageId);
this.collector.emit(new Values(sentences[index % sentences.length]));
index++;
try {
// 模拟线上环境数据慢慢流进来
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 声明一个流,使用的是默认流Default,声明元组的结构
// declarer.declare(fields);
// 声明一个流,可以声明为直接流,同时声明元组的结构,使用的是默认的流default
// declarer.declare(direct, fields);
// 声明一个流,可以设置流的名称,声明元组的结构
// declarer.declareStream(streamId, fields);
// 声明一个流,可以声明为直接流,同时声明元组的结构,还可以设置流的名称
// declarer.declareStream(streamId, direct, fields);
// 声明元祖结构,字段名称为sentence,下个闪电bolt可以通过该名称获取元祖内容
declarer.declare(new Fields("sentence"));
}
}
SplitBlot 切割每句话
package com.huawei.storm;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* @author Lpf.
* @version 创建时间:2019年4月21日 下午4:42:21
*/
public class SplitBlot extends BaseRichBolt {
private OutputCollector collector;
public void execute(Tuple input) {
// 获取上游发过来的元祖数据,根据字段名称获取值
String sentence = input.getStringByField("sentence");
// 业务逻辑
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit(new Values(word, 1));
}
}
// 当work初始化后调用
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 声明二元组,第一个是单词,第二个是单词的计数,此处都是1
declarer.declare(new Fields("word", "count"));
}
}
CountBlot 统计单词
package com.huawei.storm;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* @author Lpf.
* @version 创建时间:2019年4月21日 下午4:55:16
*/
public class CountBlot extends BaseRichBolt {
private OutputCollector collector;
private Map<String, Long> mapCount;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.mapCount = new HashMap<>();
}
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = input.getIntegerByField("count");
if (mapCount.get(word) == null) {
mapCount.put(word, 0L);
}
long sum = count + mapCount.get(word);
mapCount.put(word, sum);
this.collector.emit(new Values(word, sum));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
ReportBlot 输出
package com.huawei.storm;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
/**
* @author Lpf.
* @version 创建时间:2019年4月21日 下午4:58:55
*/
public class ReportBlot extends BaseRichBolt {
private Map<String, Long> mapCount;
private int i =0;
public void execute(Tuple input) {
if (i>1000){
System.out.println(i);
}
String word = input.getStringByField("word");
Long count = input.getLongByField("count");
mapCount.put(word, count);
i++;
}
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.mapCount = new HashMap<>();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void cleanup() {
for (Entry<String, Long> entry : mapCount.entrySet()) {
System.err.println(entry.getKey() + "---------" + entry.getValue());
}
}
}
最后把项目打包,放到linux上的node01去执行
storm jar wordcount.jar com.huawei.storm.MainClass
项目就可以跑起来
http://node01:8080 启动了ui可以通过该网页查看计算信息
更多推荐
已为社区贡献3条内容
所有评论(0)