1、RocketMQ的系统架构

1、Producer (生产者)

RocketMQ中的消息生产者都是以生产组(Producer Group)的形式出现的,生产组是同一类生产者的集合。
这类Producer发送相同的Topic类型的消息

2、Consumer(消费者)

RocketMQ中的消息消费者都是以消费组(Consumer Group)的形式出现的,消费组是同一类消费者的集合。
这类Consumer消费的是同一个Topic类型的消息,消费组使得在消息消费方面,
实现**负载均衡**(将一个Topic中的不同的Queue平均分配给同一个Consumer Group 中不同的Consumer)和
**容错**(一个Consumer挂了,该Consumer Group 中的其它Consumer可以接着消费原Consumer消费的Queue)。

3、Name Server(Broker与Topic路由的注册中心,支持Broker的动态注册与发现)

(1)Broker管理: 接受Broker集群的注册信息并且保存下来作为路由信息的基本数据:提供心跳检测机制,检查Broker是否还存活。

(2)路由信息管理: 每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费

(3)路由注册:

NameServer通常也是以集群的方式部署,不过NameServer是无状态的,NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通讯,那各节点中的数据是如何进行数据同步的呢?再Broker节点启动时,轮询与每个NameServer节点建立长连接,发送注册请求,在NameServer内部维护着一个Broker列表,用来动态存储Broker的信息。(NameServer扩容是需要哦配置NameServer相关的信息不然Broker的数据同步不到新的NameServer上)Broker节点会每30秒发送一次心跳至NameServer,心跳包含BrokerId、Broker地址、Broker名称、Broker所属集群名称,NameServer收到心跳包会更新心跳时间戳记录Broker的最新存活时间。

(4)路由剔除:

当Broker宕机了,NameServer接受Broker心跳时没有接收到,会进行删除掉当前没有接收到心跳的Broker

(5)路由发现:

1、Push模型: 当Name Server的Topic路由数据发送变化时会主动推送消息给客户端。2、Pull模型:客户端会每隔30秒向NameServer拉取一次路由数据。3、LongPolling模型:长轮询模型,客户端和服务端保持长连接,比如占用30秒(无论30秒是否有数据更新都会建立连接),等30秒的链接时间到后链接取消。

(6)客户端NameServer选择策略:

进行随机取模然后取第一台

4、Broker(Broker充当着消息中转角色,负责存储信息、转发消息。Broker在RocketMq系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求做准备。Broker同时也存储着消息相关的元数据,包括消费组消费进度偏移offset、主题、队列等)。

(1)、Remoting Module:

整个Broker的实体,负责处理来自clients端的请求。而这个Broker实体则油一下模块构成。
(2)、Client Manger:
客户端管理器,负责接收、解析客户端(Producer/Consumer)请求,管理客户端。列如:维护Consumer的Topic订阅信息.

(3)、HA Service:

高可用服务,提供MasterBroker和SlaveBroker之间的数据同步功能。(部署一个Broker之后会产生两个Borker,一个为MasterBroker另一个为SlaveBroker,BrokerName相同但是BrokerId不同,SlaveBroker主要复制MasterBroker的数据)

(4)、IndexService:

索引服务,根据特定的MessageKey,对投递到Broker的信息进行索引服务,同时也提供根据MessageKey对消息进行快速查询的功能

5、同步刷盘和异步刷盘

(1)、同步刷盘增加了数据的可靠性

producer -> java(通过java发送消息)  ->memory(内存) ->  disk(写入磁盘) -> memory(写入完成)->java (写入完成)

(2)、异步刷盘增加了吞吐量

producer -> java(通过java发送消息)  ->memory(内存) ->  disk(写入磁盘的同时直接返回写入成功)

6、Broker

(1)、同步复制

当master和slave都成功写入数据之后才返回成功,会降低系统的吞吐量但是能够保持数据的可靠性

(2)、异步复制

当master成功写入数据之后就返回成功,降低了数据的可靠性提高了系统的吞吐量	

2、生产者发送消息

发送异步消息

/**
 * @author Suny
 * @data 2021/10/8 18:06
 * @description 发送异步消息
 */
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //1、创建消息生产者,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2、指定NameServer,集群的NameServer用;号分隔
        producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
        //3、启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4、创建消息对象,指定主题Topic、Tag和消息体
            Message mes = new Message("base", "tag1", ("Hello World").getBytes());
            //5、发送消息
            producer.send(mes, new SendCallback() {
                //发送成功回调函数
                @Override
                public void onSuccess(SendResult sendResult) {

                }
                //发送失败回调函数
                @Override
                public void onException(Throwable e) {

                }
            });

            //线程睡眠1秒
            TimeUnit.SECONDS.sleep(1L);
        }
        //6、关闭生产者producer
        producer.shutdown();
    }
}

发送同步消息

/**
 * @author Suny
 * @data 2021/10/8 17:45
 * @description 发送同步消息
 */
public class SyncProducer {
    public static void main(String[] args) throws  Exception{
        //1、创建消息生产者,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2、指定NameServer,集群的NameServer用;号分隔
        producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
        //3、启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4、创建消息对象,指定主题Topic、Tag和消息体
            Message mes = new Message("base", "tag1", ("Hello World").getBytes());
            //5、发送消息
            producer.send(mes);

            //线程睡眠1秒
            TimeUnit.SECONDS.sleep(1L);
        }
        //6、关闭生产者producer
        producer.shutdown();
    }
}

发送单向消息

/**
 * @author Suny
 * @data 2021/10/8 18:13
 * @description 发送单向消息
 */
public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        //1、创建消息生产者,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2、指定NameServer,集群的NameServer用;号分隔
        producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
        //3、启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4、创建消息对象,指定主题Topic、Tag和消息体
            Message mes = new Message("base", "tag1", ("Hello World").getBytes());
            //5、发送消息
            producer.sendOneway(mes);

            //线程睡眠1秒
            TimeUnit.SECONDS.sleep(1L);
        }
        //6、关闭生产者producer
        producer.shutdown();
    }
}

发送顺序消息

/**
 * @author Suny
 * @data 2021/10/12 15:11
 * @description 按照顺序发送消息
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        //1、创建消息生产者,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2、指定NameServer,集群的NameServer用;号分隔
        producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
        //3、启动producer
        producer.start();
        //4、模拟需要按照顺序发送消息的集合
        ArrayList<OrderStep> list = new ArrayList<OrderStep>();
        for (int i = 0; i < list.size(); i++) {
            String result = list.get(i).toString();
            /**
             * 参数1:topic
             * 参数2:tag
             * 参数3: key(唯一标识)
             * 参数4:需要发送的消息体
             */
            Message mes = new Message("orderTopic", "orderTag", "i" + i, result.getBytes());
            /**
             * 参数1:消息
             * 参数2:顺序消费的匿名内部类
             * 参数3:发送消息的唯一标识
             */
            producer.send(mes, new MessageQueueSelector() {
                /**
                 * @param mqs 队列集合
                 * @param msg 消息
                 * @param arg send方法的参数3,发送消息的唯一标识
                 * @return
                 */
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long orderId  = (Long) arg;
                    long index = orderId % mqs.size();
                    //选择第几个队列按照顺序发送消息
                    return mqs.get((int)index);
                }
            },list.get(i).getOrderId());
        }

    }
}

发送事物消息

/**
 * @author Suny
 * @data 2021/10/12 16:53
 * @description 事物的生产者
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        //1、创建消息生产者,并指定生产者组名
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        //2、指定NameServer,集群的NameServer用;号分隔
        producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
        //3、启动producer
        producer.start();

        //设置事物队列的监听器
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 在该方法执行的本地事物
             * @param msg
             * @param arg
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                /**
                 * 根据tag进行判断消息是否发送、回滚、丢弃
                 */
                String tags = msg.getTags();
                switch (tags) {
                    case "TagA":
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case "TagB":
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    case "TagC":
                        return LocalTransactionState.UNKNOW;
                }
                return LocalTransactionState.UNKNOW;
            }

            /**
             * 该方法的MQ进行消息事物状态的回查
             * @param msg
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                //如果是丢弃的消息再在这里进行处理
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        for (int i = 0; i < 10; i++) {
            String[] arr = {"TagA", "TagB", "TagC"};

            //4、创建消息对象,指定主题Topic、Tag和消息体
            Message mes = new Message("base", arr[i], ("Hello World").getBytes());
            //5、发送消息
            producer.sendMessageInTransaction(mes, null);

            //线程睡眠1秒
            TimeUnit.SECONDS.sleep(1L);
        }
        //6、关闭生产者producer
        producer.shutdown();
    }
}

发送延时消息

/**
 * @author Suny
 * @data 2021/10/12 15:49
 * @description 延时生产者
 */
public class Producer {
    public static void main(String[] args) throws Exception{
        //1、创建消息生产者,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2、指定NameServer,集群的NameServer用;号分隔
        producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
        //3、启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4、创建消息对象,指定主题Topic、Tag和消息体
            Message mes = new Message("base", "tag1", ("Hello World").getBytes());

            //设置发送延时时间
            mes.setDelayTimeLevel(5);

            //5、发送消息
            producer.send(mes);

            //线程睡眠1秒
            TimeUnit.SECONDS.sleep(1L);
        }
        //6、关闭生产者producer
        producer.shutdown();
    }
}

批量发送消息

/**
 * @author Suny
 * @data 2021/10/12 16:02
 * @description 批量发送消息
 */
public class Producer {
    public static void main(String[] args) throws Exception{
        //1、创建消息生产者,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2、指定NameServer,集群的NameServer用;号分隔
        producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
        //3、启动producer
        producer.start();

        List<Message> msgList = new ArrayList<Message>();

        Message mes1 = new Message("base", "tag1", ("Hello World").getBytes());
        Message mes2 = new Message("base", "tag2", ("Hello World").getBytes());
        Message mes3 = new Message("base", "tag3", ("Hello World").getBytes());
        msgList.add(mes1);
        msgList.add(mes2);
        msgList.add(mes3);

        SendResult send = producer.send(msgList);

        producer.shutdown();
    }
}

3、消费者

普通消费

/**
 * @author Suny
 * @data 2021/10/12 16:07
 * @description 批量消费消息
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1、创建消费者Consumer,指定消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2、指定NameServer地址
        consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
        //3、订阅Topic和Tag
        consumer.subscribe("base","*");

        //4、设置监听器、处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5、启动消费者Consumer
        consumer.start();
    }
}

顺序消费

/**
 * @author Suny
 * @data 2021/10/12 15:39
 * @description 按顺序消费
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1、创建消费者Consumer,指定消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2、指定NameServer地址
        consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
        //3、订阅Topic和Tag
        consumer.subscribe("orderTopic","*");

        //4、设置监听器、处理消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg);
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
    }
}

3、整合SpringBoot

//配置文件
# nameserver
rocketmq.name-server=192.168.25.135:9576;192.168.25.138:9876
# 发送者组名
rocketmq.producer.group=my-group
# 消费者组名
rocketmq.consumer.group=my-group
//生产者
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    void testSendMessage() {
        rocketMQTemplate.convertAndSend("topic","Hello SpringBoot RocketMq");
    }
//消费者
 /**
 * @author Suny
 * @data 2021/10/15 16:43
 * @description 消费者监听器
 */
@RocketMQMessageListener(topic = "topic",consumerGroup = "${rocketmq.consumer.group}",consumeMode = ConsumeMode.CONCURRENTLY )
@Component
public class Consumer implements RocketMQListener<String> {
    /**
     * @Component 加入到Bean容器
     * RocketMQMessageListener 主题、消费组、消费模式
     * implements RocketMQListener<String> 发送是什么数据类型接收就用什么数据类型
     * @param s
     */
    @Override
    public void onMessage(String s) {

    }
}   
Logo

开源、云原生的融合云平台

更多推荐