storm完全分布式搭建
1、配置JDK
2、配置zookeeper
https://blog.csdn.net/lpf787887/article/details/89323227
3、将storm解压到/opt

 tar -zxf zookeeper-3.4.6.tar.gz -C /opt/

4、配置:

vi /opt/apache-storm-0.10.0/conf/storm.yaml 

#配置zookeeper的地址
    storm.zookeeper.servers:
      - "node01"
      - "node02"
      - "node03"
    # nimbus的位置
    nimbus.host: "node01"
    # 指定每个supervisor上worker的通信端口
	# 有几个端口就有几个worker
    supervisor.slots.ports:
      - 6700
      - 6701
      - 6702
      - 6703

5、在storm的目录下创建logs文件夹记录日志信息

mkdir logs

6、在node02和node03上分发storm包

scp -r apache-storm-0.10.0/ node02:`pwd`

7、将storm的bin添加到PATH中

export STORM_HOME=/opt/apache-storm-0.10.0
export PATH=$PATH:$STORM_HOME/bin

8、启动zookeeper集群

zkServer.sh start     	启动
zkServer.sh stop		停止
zkServer.sh status		查看zk的状态

9、启动storm的进程

启动nimbus,将标准输出重定向到当前目录的logs中的nimbus.out
错误输出也重定向到该文件,最后的&表示后台运行

  node01: 	
    	nimbus
    		storm nimbus >> logs/nimbus.out 2>&1 &
    	supervisor
    		storm supervisor >> logs/supervisor.out 2>&1 &
    	ui
    		storm ui >> logs/ui.out 2>&1 &
    node02:
    	supervisor
    		storm supervisor >> logs/supervisor.out 2>&1 &a
    node03:
    	supervisor
    		storm supervisor >> logs/supervisor.out 2>&1 &

10、关闭

killall java

附上代码
需要的jar包在apache-storm-0.10.0\apache-storm-0.10.0\lib下
在这里插入图片描述
main方法入口

package com.huawei.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
 * @author Lpf.
 * @version 创建时间:2019年4月21日 下午4:32:35
 */
public class MainClass {

	public static void main(String[] args)
			throws InterruptedException, AlreadyAliveException, InvalidTopologyException, AuthorizationException {

		// 拓扑构造器
		TopologyBuilder builder = new TopologyBuilder();

		/**
		 * 设置水龙头spout args[1] spout的名字 args[2] 实现的spout的类
		 */
		builder.setSpout("sentence_sport", new SentenceSpout());

		/**
		 * 设置闪电bolt args[1] bolt的名字 args[2] 实现的bolt的类
		 */
		// 以随机分组的形式接收sentence_spout发送过来的元组
		builder.setBolt("split_blot", new SplitBlot()).shuffleGrouping("sentence_sport");
		// 相同word值的元组分发给同一个bolt来处理,将word的值hash取模搞定
		builder.setBolt("count_blot", new CountBlot()).fieldsGrouping("split_blot", new Fields("word"));
		builder.setBolt("report_blot", new ReportBlot()).globalGrouping("count_blot");

		// // 本地模拟storm集群
		// LocalCluster cluster = new LocalCluster();
		// // 用于配置信息
		// Config conf = new Config();
		// // 启用debug模式
		// conf.setDebug(true);
		// /**
		// * 向本地集群提交拓扑 args[1] 提交作业的名字 args[2] 加载的一些配置 args[3] 拓扑流程
		// */
		// cluster.submitTopology("wordcount", conf, builder.createTopology());
		//
		// Thread.sleep(3000);
		// // 关闭本地集群
		// cluster.shutdown();

		// 在集群提交拓扑
		// 用于配置信息
		Config conf = new Config();
		StormSubmitter.submitTopology("wordCount", conf, builder.createTopology());
	}
}

SentenceSpout模拟线上环境读取数据信息

package com.huawei.storm;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * @author Lpf.
 * @version 创建时间:2019年4月21日 下午4:38:42
 */
public class SentenceSpout extends BaseRichSpout {

	private SpoutOutputCollector collector;

	private int index = 0;

	private String[] sentences = { "Apache Storm is a free doing for realtime processing what Hadoop did ",
			"processing. Storm is simple, can be used with",
			"processed per second per node. It is scalable, fault-tolerant",
			"the streams between each stage of the computation however needed",
			"Storm users should send messages and subscribe to user@storm.apache.org.",
			"You can also browse the archives of the storm-user mailing list.",
			"You can also browse the archives of the storm-dev mailing list.",
			"You can view the archives of the mailing list here.", };

	// 当集群初始化水龙头spout后,调用该方法
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
	}

	// 每次调用该方法就从数据源读取一个句子然后发射出去
	public void nextTuple() {
		// 发送当前元组
		// this.collector.emit(tuple);
		// 发射指定的元组,同时给该元组设置一个消息ID,当该元组在下个bolt中处理失败,则该spout重发
		// this.collector.emit(tuple, messageId);
		// 可以指定使用哪个流来发射该元组,streamId就是在declareOutputFields声明的流名称
		// this.collector.emit(streamId, tuple);
		// this.collector.emit(streamId, tuple, messageId);
		// 将当前元祖发送给哪个任务,可以通过taskId来指定,直接分组
		// this.collector.emitDirect(taskId, tuple);
		// 还可以保证消息处理的完整性 messageId
		// this.collector.emitDirect(taskId, tuple, messageId);
		// 指定流来发送当前元组,直接分组策略
		// this.collector.emitDirect(taskId, streamId, tuple);
		// 可以保证消息完整性 messageId
		// this.collector.emitDirect(taskId, streamId, tuple, messageId);
		this.collector.emit(new Values(sentences[index % sentences.length]));
		index++;

		try {
			// 模拟线上环境数据慢慢流进来
			Thread.sleep(10);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// 声明一个流,使用的是默认流Default,声明元组的结构
		// declarer.declare(fields);
		// 声明一个流,可以声明为直接流,同时声明元组的结构,使用的是默认的流default
		// declarer.declare(direct, fields);
		// 声明一个流,可以设置流的名称,声明元组的结构
		// declarer.declareStream(streamId, fields);
		// 声明一个流,可以声明为直接流,同时声明元组的结构,还可以设置流的名称
		// declarer.declareStream(streamId, direct, fields);
		// 声明元祖结构,字段名称为sentence,下个闪电bolt可以通过该名称获取元祖内容
		
		declarer.declare(new Fields("sentence"));
	}

}

SplitBlot 切割每句话

package com.huawei.storm;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * @author Lpf.
 * @version 创建时间:2019年4月21日 下午4:42:21
 */
public class SplitBlot extends BaseRichBolt {

	private OutputCollector collector;

	public void execute(Tuple input) {
		// 获取上游发过来的元祖数据,根据字段名称获取值
		String sentence = input.getStringByField("sentence");
		// 业务逻辑
		String[] words = sentence.split(" ");
		for (String word : words) {
			this.collector.emit(new Values(word, 1));
		}
	}

	// 当work初始化后调用
	public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// 声明二元组,第一个是单词,第二个是单词的计数,此处都是1
		declarer.declare(new Fields("word", "count"));
	}

}

CountBlot 统计单词

package com.huawei.storm;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * @author Lpf.
 * @version 创建时间:2019年4月21日 下午4:55:16
 */
public class CountBlot extends BaseRichBolt {

	private OutputCollector collector;
	private Map<String, Long> mapCount;

	public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
		this.mapCount = new HashMap<>();
	}

	public void execute(Tuple input) {

		String word = input.getStringByField("word");
		Integer count = input.getIntegerByField("count");
		if (mapCount.get(word) == null) {
			mapCount.put(word, 0L);
		}
		long sum = count + mapCount.get(word);
		mapCount.put(word, sum);

		this.collector.emit(new Values(word, sum));
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word", "count"));
	}

}

ReportBlot 输出

package com.huawei.storm;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

/**
 * @author Lpf.
 * @version 创建时间:2019年4月21日 下午4:58:55
 */
public class ReportBlot extends BaseRichBolt {

	private Map<String, Long> mapCount;
	private  int i =0;

	public void execute(Tuple input) {
		if (i>1000){
			System.out.println(i);
		}
		String word = input.getStringByField("word");
		Long count = input.getLongByField("count");
		mapCount.put(word, count);
		i++;
	}

	public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
		this.mapCount = new HashMap<>();
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {

	}

	public void cleanup() {
		for (Entry<String, Long> entry : mapCount.entrySet()) {
			System.err.println(entry.getKey() + "---------" + entry.getValue());
		}
	}
}

最后把项目打包,放到linux上的node01去执行

storm jar wordcount.jar com.huawei.storm.MainClass

项目就可以跑起来

http://node01:8080	启动了ui可以通过该网页查看计算信息

在这里插入图片描述

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐