1.RocketMq安装部署

Linux 安装 RocketMq-CSDN博客

2.添加依赖包

        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-apis -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.5</version>
        </dependency>

3.生产者相关代码

package com.lhy.demo.rocketMqGRPC;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
import org.apache.rocketmq.client.apis.producer.TransactionChecker;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * <p>
 * MyProducer
 * </p>
 *
 * @author ocean
 * @version 1.0.0
 * @since 2023/11/27 15:33
 */
@Slf4j
@Configuration
public class MyProducerConfig {

    private static final String ACCESS_KEY = "yourAccessKey";
    private static final String SECRET_KEY = "yourSecretKey";
    private static final String ENDPOINTS = "127.0.0.1:9875";
    private static volatile Producer PRODUCER;
    private static volatile Producer TRANSACTIONAL_PRODUCER;

    @Bean(name = "producer")
    public Producer producer() throws ClientException {
        return getInstance("normalTopic");
    }

//    @Bean(name = "transactionalProducer")
//    public Producer transactionalProducer() throws ClientException {
//        return getTransactionalInstance(null, "");
//    }

    private Producer buildProducer(TransactionChecker checker, String... topics) throws ClientException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        // 凭据提供程序对于客户端配置是可选的。只有在启用服务器ACL时,此参数才是必需的。否则默认情况下不需要进行设置。
        //SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(ENDPOINTS)
                // 在某些Windows平台上,您可能会遇到SSL兼容性问题。尝试关闭中的SSL选项如果SSL不是必需的,请使用客户端配置来解决问题。
                .enableSsl(false)
                // .setCredentialProvider(sessionCredentialsProvider)
                .build();
        final ProducerBuilder builder = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                //设置主题名称,这是可选的,但建议使用。它使得生产者可以在消息发布之前预取主题路由。
                .setTopics(topics);
        if (checker != null) {
            // 设置事务检查器.
            builder.setTransactionChecker(checker);
        }
        return builder.build();
    }

    private Producer getInstance(String... topics) throws ClientException {
        if (null == PRODUCER) {
            synchronized (MyProducerConfig.class) {
                if (null == PRODUCER) {
                    PRODUCER = buildProducer(null, topics);
                }
            }
        }
        return PRODUCER;
    }

    private Producer getTransactionalInstance(TransactionChecker checker, String... topics) throws ClientException {
        if (null == TRANSACTIONAL_PRODUCER) {
            synchronized (MyProducerConfig.class) {
                if (null == TRANSACTIONAL_PRODUCER) {
                    TRANSACTIONAL_PRODUCER = buildProducer(checker, topics);
                }
            }
        }
        return TRANSACTIONAL_PRODUCER;
    }

}
package com.lhy.demo.rocketMqGRPC;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

/**
 * <p>
 * MyProducerController
 * </p>
 *
 * @author ocean
 * @version 1.0.0
 * @since 2023/11/27 16:21
 */
@RestController
@Slf4j
public class MyProducerController {

    @Resource
    private Producer producer;

    @GetMapping("sendNormalMsg/{messageBody}")
    public String sendNormalMsg(@PathVariable("messageBody") String messageBody) {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        String topic = "normalTopic";
        byte[] body = messageBody.getBytes(StandardCharsets.UTF_8);
        String key = "key1";
        String tag = "tag1";
        final Message message = provider.newMessageBuilder()
                // 消息主题.
                .setTopic(topic)
                // 消息标签.
                .setTag(tag)
                // 消息的密钥,除了消息id之外的另一种标记消息的方式.
                .setKeys(key)
                .setBody(body)
                .build();
        try {
            final SendReceipt sendReceipt = producer.send(message);
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (Throwable t) {
            log.error("Failed to send message", t);
        }
        // 当你不再需要的时候,关闭制作人。您可以手动关闭它,也可以将其添加到JVM关闭挂钩中。
        // producer.close();
        return "OK";
    }

}

 

4.消费者相关代码

package com.lhy.demo.rocketMqGRPC;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Collections;

/**
 * <p>
 * MyPushConsumerConfig
 * </p>
 *
 * @author ocean
 * @version 1.0.0
 * @since 2023/11/27 15:33
 */
@Slf4j
@Configuration
public class MyPushConsumerConfig {

    private static final String ACCESS_KEY = "accessKey";
    private static final String SECRET_KEY = "secretKey";
    private static final String ENDPOINTS = "127.0.0.1:9875";

    @Bean(name = "normalTopicPushConsumer")
    public PushConsumer normalTopicPushConsumer() throws ClientException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        // 凭据提供程序对于客户端配置是可选的。只有在启用服务器ACL时,此参数才是必需的。否则默认情况下不需要进行设置。
        //SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(ENDPOINTS)
                // On some Windows platforms, you may encounter SSL compatibility issues. Try turning off the SSL option in
                // client configuration to solve the problem please if SSL is not essential.
                .enableSsl(false)
                // .setCredentialProvider(sessionCredentialsProvider)
                .build();
        String tag = "tag1";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        String consumerGroup = "consumerGroup1";
        String topic = "normalTopic";
        // In most case, you don't need to create too many consumers, singleton pattern is recommended.
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Set the consumer group name.
                .setConsumerGroup(consumerGroup)
                // Set the subscription for the consumer.
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .setMessageListener(messageView -> {
                    // Handle the received message and return consume result.

                    log.info("Consume message={}", messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        return pushConsumer;
    }

}

 

5.注意事项

使用 gRPC 协议时务端至少升级到5.0版本,并启用 gRPC Proxy 才可兼容。

5.1 启动Broker+Proxy

### 先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
$ tail -f ~/logs/rocketmqlogs/proxy.log 
The broker[broker-a,192.169.1.2:10911] boot success...

5.2 指定代理端口

修改rocketmq5.1.4/conf/rmq-proxy.json文件

{
  "grpcServerPort": 9875,
  "rocketMQClusterName": "DefaultCluster"
}

 

6.官方Demo地址

https://github.com/apache/rocketmq-clients/tree/master/java/client/src/main/java/org/apache/rocketmq/client/java/example

 

7. Remoting 协议集成

Springboot 集成 RocketMq(入门)-CSDN博客

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐