RocketMQ介绍

一.消息中间件
1.应用场景
	1.异步解耦 
	例:注册 发短信 发邮件的操作,我们以前要先注册,再等着发短信,最后等着发邮件,
	   而使用消息件就直接注册将其封装成一个对象放到消息中间件中,然后由消息中间件去监控发短信与发邮件,
       然后接着去干其他事情,不用等着
	2.削峰填谷 
    //请求先到消息中间件  然后服务器一点一点的获取中间件中的请求,避免一次性所有请求全部到达服务器
	3.分布式缓存同步/消息分发
    //微服务中,每个服务都放到消息中间件中,当一个服务需要调用另一个服务时,直接到消息中间件中获取
未使用消息中间件

在这里插入图片描述

使用消息中间件(异步解耦)

在这里插入图片描述

削峰填谷

在这里插入图片描述

2.消息中间件
1.ActiveMQ
2.KafKa
3.RabbitMQ
4.RocketMQ
消息中间件对比

在这里插入图片描述

二.RocketMQ的核心概念
RocketMQ主要有四大核心组成部分:NameServerBrokerProducer以及Consumer四部分
1.名字服务NameServer
主要负责对于源数据的管理,包括了对于Topic和路由信息的管理
1.生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。
2.多个Namesrv实例组成集群,但相互独立,没有信息交换。
注意:Broker向NameServer发心跳时,会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),
会导致一次心跳中,Topic的数据就几十M,网络情况差的话,网络传输失败,心跳失败,导致NameServer误认为Broker心跳失败
2.代理服务器Broker Server
消息中转角色,负责存储消息、转发消息
1.Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,
  并会定时将Topic信息注册到NameServer
2.代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备
3.生产者Producer
消息生产者,负责产生消息,一般由业务系统负责产生消息
1.一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。
2.RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。
3.同步和异步方式均需要Broker返回确认信息,单向发送不需要。
4.消费者Consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费
Consumer也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费和广播消息,提供实时的消息订阅机制
5.消息内容Message
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题
1.RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。
2.系统提供了通过Message ID和Key查询消息的功能。
6.消息主题Topic
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题
是RocketMQ进行消息订阅的基本单位
7.标签Tag
为消息设置的标志,用于同一主题下区分不同类型的消息
1.标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统
2.消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
8.消息队列MessageQueue
主题被划分为一个或多个子主题,即消息队列
对于每个Topic都可以设置一定数量的消息队列用来进行数据的读取

在这里插入图片描述

三.发送消息方式
1.同步发送(默认同步)
必须要等消息持久化到磁盘中以后,rocketmq给生产者返回一个消息,持久化完成
2.异步发送
不需要等消息持久化到磁盘中,就可以执行后面的业务逻辑
3.单向发送
一次性发送,把消息发送到消息中间件中,不需要确认返回结果
四.消费模式
1.集群模式(默认模式)
MessageModel.CLUSTERING  多个消费者分担一个消费者的压力, 一个消息只会给一个消费者消费(订单只会支付一次)
//默认集群模式
//设置集群模式
 consumer.setMessageModel(MessageModel.CLUSTERING);//不设置默认也可以
2.广播模式
MessageModel.BROADCASTING  需要对同一个消息进行不同处理的时候, 比如对同一个消息, 需要同时发送短信和发送邮件,
 一个消息会发送给所有的消费者(同时发送短信,邮箱)
//在消费者端加一个设置广播模式就好
//设置广播模式
 consumer.setMessageModel(MessageModel.BROADCASTING);
五.消费方式
1.推送消费(push)
//主动将信息推送给消费者
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer");
2.拉取消费(pul)
//消费者主动broker中获取,前提是broker中要存在生产者已经发送过来的信息,否则报错
关键类
//1.DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer");
//2.PullResult pullResult = consumer.pull(new MessageQueue("pull", "broker-a", 0), "*", 0, 1);
六.延时消息
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
//在生产端设置延时等级2, 5秒后发送
message.setDelayTimeLevel(2);
//延迟等级
延时消息的使用限制
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级118
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
七.消息过滤
1.Tag标签过滤(默认这个)
生产者
  Message message = 
    new Message("06-filter", "tagC", "hello,rocketmq".getBytes("utf-8"));
消费者
 consumer.subscribe("06-filter", "tagA||tagC");
//生产者第二个参数tag与消费者第二个参数相对应就都能匹配,如果消费者使用*就都能接受
2.SQL92过滤
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
//注意: 在使用SQL过滤的时候, 需要在conf中broker.conf配置参数enablePropertyFilter=true
 步骤:
	1.在生产者中设置条件 message.putUserProperty("age", "20");
	2.在消费者中设置匹配条件
	consumer.subscribe("06-filter", MessageSelector.bySql("age>18"));
八.使用api实现
生产者
public class Producer {
    public static void main(String[] args) throws Exception{
        //1 创建一个生产者对象, 并且指定一个生产者组
        DefaultMQProducer producer = new DefaultMQProducer("wolfcode-producer");
        //2 设置名字服务的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3 启动生产者
        producer.start();
        //4 创建一个消息
        Message message = new Message("01-hello", "tagA", "hello,rocketmq".getBytes("utf-8"));
        //5 使用生产者发送消息
        producer.send(message);
        //6 关闭连接
        producer.shutdown();
    }
}
消费者
public class Consumer {
    public static void main(String[] args) throws Exception{
        //创建一个拉取消息的消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wolfcode-consumer");
        //设置名字地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //绑定消息的主题
        consumer.subscribe("01-hello", "*");
        //消费者监听处理消息方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消费线程:"+Thread.currentThread().getName()+",消息ID:"+msg.getMsgId()+",消息内容:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}
九.SpringBoot集成RocketMQ
1>同步/异步/一次性消息
1.同步消息(生产者)
RestController
public class SendMsgController {
    @Autowired
    private RocketMQTemplate template;

    @RequestMapping("/sendMsg")
    public String sendMsg(String msg) {
        //第一个参数目的地(主题)
        //第二个参数:发送的消息
        //同步发送的方式
        template.syncSend("01-boot-hello", msg);
        return "发送成功";
    }
}
2.异步消息(生产者)
   @RequestMapping("/asyncMsg")
    public String asyncMsg(String msg) {
        //异步发送的方式
        template.asyncSend("01-boot-hello", msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("发送失败");
            }
        });
        return "发送成功";
    }
3.一次性消息(生产者)
  @RequestMapping("/sendOneWay")
    public String sendOneWay(String msg) 
        //一次性发送的方式
        template.sendOneWay("01-boot-hello", msg);
        return "发送成功";
    }
同步/异步/一次性(消费者代码不变)
@Component
@RocketMQMessageListener(
        consumerGroup = "boot-consumer-demo",  //组
        topic="01-boot-hello"  //主题
)
public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("消息的内容" + msg);
    }
}
2>集群模式/广播模式
//生产者
@RestController
public class SendMsgController {
    @Autowired
    private RocketMQTemplate template;
 @RequestMapping("/sendMsg")
    public String sendMsg(String msg) 
        template.syncSend("02-consumer-model", msg);
        return "发送成功";
    }
}
//消费者
//集群模式
@Component
@RocketMQMessageListener(
        consumerGroup = "boot-consumer-demo",
        topic="02-consumer-model",
        messageModel = MessageModel.CLUSTERING //集群模式
)
public class Consumer1 implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("消费者一,消息的内容" + msg);
    }
}
//消费者
//广播模式
@Component
@RocketMQMessageListener(
        consumerGroup = "boot-consumer-demo",
        topic="02-consumer-model",
        messageModel = MessageModel.BROADCASTING //广播模式
)
public class Consumer2 implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("消费者二,消息的内容" + msg);
    }
}
3>延时消息
//生产者
@RestController
public class SendMsgController {

    @Autowired
    private RocketMQTemplate template;
    @RequestMapping("/sendDelayMsg")
    public String sendDelayMsg(String msg) {  
        //延时发送的方式
        MessageBuilder<String> builder = MessageBuilder.withPayload(msg);
        template.syncSend("03-boot-delay", builder.build(),5000,2);
        return "发送成功";
    }
}
//消费者
@Component
@RocketMQMessageListener(
        consumerGroup = "boot-consumer-demo1",
        topic="03-boot-delay"
)
public class Consumer3 implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        System.out.println("消息的内容" + msg);
    }
}
4>tag过滤
//生产者
@RestController
public class SendMsgController {
  @RequestMapping("/filter")
    public String filter(String msg,String tag) {
        template.syncSend("04-boot-filter:"+tag, msg);
        return "发送成功";
    }
}
//消费者
@Component
@RocketMQMessageListener(
        consumerGroup = "boot-consumer-demo",
        topic = "04-boot-filter",
        selectorType = SelectorType.TAG,   //tag过滤方式
        selectorExpression = "TagA||TagC"   //过滤格式
)
public class Consumer4 implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("消息的内容" + msg);
    }
}
//测试:http://localhost:8089/filter?msg=hello&tag=TagA
5>SQL92过滤
//生产者
@RestController
public class SendMsgController {
    @RequestMapping("/filter")
    public String filter(String msg,String age) {
        HashMap map = new HashMap();
        map.put("age",age );
        template.convertAndSend("04-boot-filter:", msg,map);
        return "发送成功";
    }
}
//消费者
@Component
@RocketMQMessageListener(
        consumerGroup = "boot-consumer-demo",
        topic = "04-boot-filter",
      /*  selectorType = SelectorType.TAG,
        selectorExpression = "TagA||TagC"*/
       selectorType = SelectorType.SQL92,  //SQL92过滤
        selectorExpression = "age>20"   //过滤格式
)
public class Consumer4 implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("消息的内容" + msg);
    }
}
//测试:http://localhost:8089/filter?msg=hello&age=25
pom.xml依赖
<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
</dependency>
application.properties
//生产者
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
//设置发送消息时间,默认3秒,设置10秒,防止网络延迟导致发送失败
rocketmq.producer.send-message-timeout=10000  
server.port=8089
//消费者
rocketmq.name-server=127.0.0.1:9876
server.port=8088
十.小结
1.RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件
2.RcoketMQ使用场景有异步解耦 削峰填谷 分布式缓存同步/消息分发等
3.生产者提供三种发送方式:1.同步发送 2.异步发送 3.单向发送
4.消费者支持PUSH和PULL两种消费模式,支持集群消费和广播消息 
5.消费者的集群消费和广播消息只需要修改消费者端的代码即可
6.2种消息过滤都只要更改消费端注解上的类型和过滤方式即可
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐