K8s安装RabbitMq以及基本使用
rabbitmq属于有状态的服务,即每个服务上存储的内容都不一样,对于有状态的服务,k8s推荐我们使用StatefulSet控制器。rabbitmq中的部分信息需要持久化,持久化内容使用nfs进行存储,并使用storageclass动态分配pv。......
K8s安装RabbitMq
rabbitmq属于有状态的服务,即每个服务上存储的内容都不一样,对于有状态的服务,k8s推荐我们使用StatefulSet控制器。
rabbitmq中的部分信息需要持久化,持久化内容使用nfs进行存储,并使用storageclass动态分配pv。
教程
部署教程
参考文章:K8S部署RabbitMQ集群 (镜像模式) - 部署笔记 - 腾讯云开发者社区-腾讯云 (tencent.com)
rabbitmq教程
RabbitMQ 基础核心配置文件介绍丨慕课网教程 (imooc.com)
安装步骤:
1. NFS安装以及storageclass创建
参考文章:[mystudy/大数据/k8s/8. 存储与配置.md · Zhang-HaoQi/Knowledge - 码云 - 开源中国 (gitee.com)](https://gitee.com/zhang-haoqi/knowledge/blob/develop/mystudy/大数据/k8s/8. 存储与配置.md#1-安装nfs)
2. 创建storageclass
需要先为NFS安装存储分配器,再创建NFS存储分配器。
1. 安装NFS存储分配器
第一步:设置存储分配器的权限
- 创建nfs-client-provisioner-authority.yaml文件
# nfs-client-provisioner-authority.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: nfs-client-provisioner
# replace with namespace where provisioner is deployed
namespace: default
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: nfs-client-provisioner-runner
rules:
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "update", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: run-nfs-client-provisioner
subjects:
- kind: ServiceAccount
name: nfs-client-provisioner
# replace with namespace where provisioner is deployed
namespace: default
roleRef:
kind: ClusterRole
name: nfs-client-provisioner-runner
apiGroup: rbac.authorization.k8s.io
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: leader-locking-nfs-client-provisioner
# replace with namespace where provisioner is deployed
namespace: default
rules:
- apiGroups: [""]
resources: ["endpoints"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: leader-locking-nfs-client-provisioner
# replace with namespace where provisioner is deployed
namespace: default
subjects:
- kind: ServiceAccount
name: nfs-client-provisioner
# replace with namespace where provisioner is deployed
namespace: default
roleRef:
kind: Role
name: leader-locking-nfs-client-provisioner
apiGroup: rbac.authorization.k8s.io
- kubectl apply -f nfs-client-provisioner-authority.yaml
第二步:安装NFS存储分配器
-
下载NFS存储分配器:https://raw.githubusercontent.com/Kubernetes-incubator/external-storage/master/nfs-client/deploy/deployment.yaml
-
是一个yaml文件,可以浏览器打开直接复制
-
修改相关的卷信息 文件名:nfs-client-provisioner.yaml
-
apiVersion: apps/v1 kind: Deployment metadata: name: nfs-client-provisioner labels: app: nfs-client-provisioner # replace with namespace where provisioner is deployed namespace: default spec: replicas: 1 strategy: type: Recreate selector: matchLabels: app: nfs-client-provisioner template: metadata: labels: app: nfs-client-provisioner spec: serviceAccountName: nfs-client-provisioner containers: - name: nfs-client-provisioner image: quay.io/external_storage/nfs-client-provisioner:latest volumeMounts: - name: nfs-client-root mountPath: /persistentvolumes env: - name: PROVISIONER_NAME # 存储分配器的默认名称 value: fuseim.pri/ifs - name: NFS_SERVER # NFS服务器地址 value: xx.xx.236.113 - name: NFS_PATH # NFS共享目录地址 value: /data/k8snfs volumes: - name: nfs-client-root nfs: server: xx.xx.236.113 # NFS服务器地址 path: /data/k8snfs # NFS共享目录
-
-
kubectl apply -f nfs-client-provisioner.yaml
-
查看创建的pod:
第三步: 创建NFS存储分配器
-
创建nfs-storage-class.yml
apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: rabbitmq-nfs-storage namespace: rabbit-mq provisioner: fuseim.pri/ifs reclaimPolicy: Retain allowVolumeExpansion: True
-
执行文件:
kubectl apply -f nfs-storage-class.yml
-
查看状态:
kubectl get storageclass
-
查看详情:
kubectl describe storageclass nfs-storage-class.yml
-
StorageClass 创建完成后就可以创建 PVC 了。
2. 创建service
---
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-management
namespace: rabbit-mq
labels:
app: rabbitmq
spec:
ports:
- port: 15672
name: http
selector:
app: rabbitmq
type: NodePort # 外界可以访问
---
apiVersion: v1
kind: Service
metadata:
name: rabbitmq
namespace: rabbit-mq
labels:
app: rabbitmq
spec:
ports:
- port: 5672
name: amqp
- port: 4369
name: epmd
- port: 25672
name: rabbitmq-dist
clusterIP: None # 无头service,外界不可以访问,只能由pod内部访问
selector:
app: rabbitmq
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
namespace: rabbit-mq
name: rabbitmq
spec:
serviceName: "rabbitmq"
replicas: 1 # 设置节点数量为1(磁盘有限)
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- rabbitmq
topologyKey: "kubernetes.io/hostname"
containers:
- name: rabbitmq
image: rabbitmq:3.7-rc-management
lifecycle:
postStart:
exec:
command:
- /bin/sh
- -c
- >
if [ -z "$(grep rabbitmq /etc/resolv.conf)" ]; then
sed "s/^search \([^ ]\+\)/search rabbitmq.\1 \1/" /etc/resolv.conf > /etc/resolv.conf.new;
cat /etc/resolv.conf.new > /etc/resolv.conf;
rm /etc/resolv.conf.new;
fi;
until rabbitmqctl node_health_check; do sleep 1; done;
if [ -z "$(rabbitmqctl cluster_status | grep rabbitmq-0)" ]; then
touch /gotit
rabbitmqctl stop_app;
rabbitmqctl reset;
rabbitmqctl join_cluster rabbit@rabbitmq-0;
rabbitmqctl start_app;
else
touch /notget
fi;
env:
- name: MY_POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: RABBITMQ_ERLANG_COOKIE
value: "YZSDHWMFSMKEMBDHSGGZ"
- name: RABBITMQ_NODENAME
value: "rabbit@$(MY_POD_NAME)"
ports:
- name: http
protocol: TCP
containerPort: 15672
- name: amqp
protocol: TCP
containerPort: 5672
livenessProbe:
tcpSocket:
port: amqp
initialDelaySeconds: 5
timeoutSeconds: 5
periodSeconds: 10
readinessProbe:
tcpSocket:
port: amqp
initialDelaySeconds: 15
timeoutSeconds: 5
periodSeconds: 20
volumeMounts:
- name: rabbitmq-data
mountPath: /var/lib/rabbitmq
volumeClaimTemplates:
- metadata:
name: rabbitmq-data
annotations:
volume.beta.kubernetes.io/storage-class: "rabbitmq-nfs-storage"
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 10Gi
应用yaml:kubectl apply-f rabbitmq.yaml -n rabbit-mq
控制台对外暴露的端口是31719
访问控制台:有一个mq节点。因为空间有限,因此只设置了一个节点。
查看nfs存储的配置文件:
3. 使用rabbitmq服务
-
进入服务:
kubectl exec -it rabbitmq-0 -n rabbit-mq /bin/bash
-
虚拟主机操作vhost:
-
vhost 是 AMQP 概念的基础,客户端在连接的时候必须制定一个 vhost。RabbitMQ 默认创建的 vhost 为“/”,
vhost类似服务中的一台虚拟主机,提供逻辑隔离,不同的业务使用mq时最好创建不同的vhost,对业务进行归类划分
-
查看虚拟主机vhost:
rabbitmqctl list_vhosts
-
创建虚拟主机vhost:
rabbitmqctl add_vhost compile-code
-
删除虚拟主机vhost:
rabbitmqctl delete_vhost {vhost}
删除一个 vhost 同时也会删除其下所有的队列、交换器、绑定关系、用户权限、参数和策略等信息
-
查看主机是否使用了 RabbitMQ 的 trace 功能
rabbitmqctl list_vhosts name tracing
-
在 RabbitMQ 中,权限控制则是以 vhost 为单位的。当创建一个用户时,用户通常会被指派给至少一个 vhost,并且只能访问被指派的 vhost 内的队列,交换器和绑定关系等。
-
rabbitmqctl set_permissions [-p vhost] {user} {conf}{write}{read}
-
可配置指的是队列和交换器的创建及删除之类的操作;可写指的是发布消息;可读指与消息有关的操作,包括读取消息及清空整个队列等。
-
赋予用户权限
-
查看用户:
rabbitmqctl list_users
-
创建用户:
rabbitmqctl add_user coder train@coder
-
创建coder用户并赋予compile-coding所有权限:
rabbitmqctl set_permissions -p compile-code coder ".*" ".*" ".*"
-
清除权限:
rabbitmqctl clear_permissions -p compile-code coder
-
显示宿主机权限:
rabbitmqctl list_permissions -p compile-code
-
显示用户权限:
rabbitmqctl list_user_permissions coder
-
使用coder用户登录web控制台,发现登录不进去。原因:没有给用户赋予角色
-
用户的角色分为 5 种类型。
-
设置角色类型为management:
rabbitmqctl set_user_tags coder management
什么都没有
-
设置角色类型为monitoring:
rabbitmqctl set_user_tags coder monitoring
-
经过测试,management,poliymaker,monitoring能进去web页面,但是什么内容也没有,none和administartor登录不进去。
-
-
-
-
4. 将 headless service 映射到外网
我们使用的是StatefulSet控制器创建的无头的rabbitmq服务,服务地址是不暴露给外界的,只能在pod中访问到rabbitmq服务。这样很不利于我们开发测试。
查看服务信息
将 rabbitmq 通过负载均衡映射到外网
kubectl expose service rabbitmq -n rabbit-mq --type=LoadBalancer --name=rabbitmq-external-lb
此时就可以进行链接测试,如果不负载均衡到外网,mq服务是一直链接不上的。
SpringBoot测试
引入依赖
<!--消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# spring配置
spring:
rabbitmq:
host: XX.XX.91.15
port: 32116
username: coder
password: train@coder
virtual-host: compile-code
测试代码
我们往key为hello的交换机中放一条hello world的消息。
注意:需要先创建交换机和队列。
交换机
队列:注意,要与交换机绑定
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class MqTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testHello(){
// System.out.println(123);
System.out.println(rabbitTemplate);
rabbitTemplate.convertAndSend("hello","hello world");
}
}
rabbitmq实战
1. 配置
springboot配置文件:(112条消息) springboot整合rabbitmq的配置文件详解_xiaoweiwei99的博客-CSDN博客_rabbitmq配置springboot
-
创建枚举,列取需要的交换机,队列,路由key
@Getter public enum CompileCodeEnum { /** * 在线编程 */ PYTHON_CODE("compile.python.direct", "compile.python", "compile.python"), R_CODE("compile.r.direct", "compile.r", "compile.r"), SCALA_CODE("compile.scala.direct", "compile.scala", "compile.scala"); /** * 交换名称 */ private String exchange; /** * 队列名称 */ private final String name; /** * 路由键 */ private String routeKey; CompileCodeEnum(String exchange,String name,String routeKey) { this.exchange = exchange; this.name = name; this.routeKey = routeKey; } }
-
创建交换机,创建队列,交换机通过routingkey与队列绑定
@Configuration public class CompileCodeConfig { //springboot 会为我们自动初始化这个CachingConnectionFactory @Autowired private CachingConnectionFactory connectionFactory; @Bean public DirectExchange examComputedExchange() { // Map<String, Object> arguments = new HashMap<>(); // //设置自定义交换机的类型。 // arguments.put("x-delayed-type", "direct"); //1.交换机名称 //2.交换机的类型 //3.是否需要持久化 //4.是否需要自动删除 //5.其他参数 return ExchangeBuilder.directExchange(CompileCodeEnum.PYTHON_CODE.getExchange()).durable(true).build(); } @Bean public Queue examComputedQueue() { return QueueBuilder .durable(CompileCodeEnum.PYTHON_CODE.getName()) .build(); } @Bean public Binding bindingExamComputedQueueToExchange() { return BindingBuilder .bind(examComputedQueue()) .to(examComputedExchange()) .with(CompileCodeEnum.PYTHON_CODE.getRouteKey()); } }
-
消息发送到交换机,发消息需要指定交换机和routingkey,说明消息进入哪个队列
@Component @Slf4j public class CompileCodeSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(Long recordId) { rabbitTemplate.convertAndSend(CompileCodeEnum.PYTHON_CODE.getExchange(), CompileCodeEnum.PYTHON_CODE.getRouteKey(), recordId); } }
-
消费者选择队列进行监听,消费队列中的消息
@Component @Slf4j public class CompileCodeCustomer { @RabbitListener(queues = "compile.python") public void handleExamProcessComputed(Long recordId) throws InterruptedException { log.info("进入考试消费队列, id:{}", recordId); Thread.sleep(10000); } }
-
测试
消费:每条消费相差10s消费
-
问题
消费者一次只能消费一条消息,效率比较低,希望能够消费多条
2. 配置多个消费者
-
创建消费工厂:在CompileCodeConfig中配置
@Autowired private CachingConnectionFactory connectionFactory; @Bean public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){ SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory(); listenerContainerFactory.setConnectionFactory(connectionFactory); listenerContainerFactory.setConcurrentConsumers(15); //并发消费者数量 listenerContainerFactory.setMaxConcurrentConsumers(15);//最大的并发消费者数量 listenerContainerFactory.setPrefetchCount(10);//预处理消息个数 默认轮询发给各个消费者,但是可能会造成有的消费者任务繁重,来不及消费,而有的消费者可能处理较快,设置prefetchCount,允许限制信道上的消费者所能保持的最大未确认消息的数量,如果达到上限,则不再向此消费者发送。直到消费者消费了消息后,才继续分配。 // listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//开启消息确认机制 return listenerContainerFactory; }
-
在消费者的rabbitmqlistener设置工厂
@RabbitListener(queues = "compile.python", containerFactory = "simpleRabbitListenerContainerFactory") public void handleExamProcessComputed(Long recordId) throws InterruptedException { log.info("进入考试消费队列, id:{}", recordId); Thread.sleep(10000); }
-
结果:同时消费多条消息
-
还可以设置工厂
/** * 个人理解为SimpleMessageListenerContainer就是一个消费者的容器配置类 * @return */ @Bean(name="simpleMessageListenerContainer") public SimpleMessageListenerContainer productMessageListenerContainerFactory(){ SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); listenerContainer.setConnectionFactory(connectionFactory); listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置为手动确认,在yaml配置后,初始化bean时默认的自动确认会覆盖掉yaml中的配置,收到消息时,会先自动确认一次,再手动确认。手动确认时,此时chanel已经关闭,会出现错误日志。 listenerContainer.setConcurrentConsumers(10); listenerContainer.setMaxConcurrentConsumers(20); listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); listenerContainer.setQueues(productQueue()); listenerContainer.setMessageListener(productConsumerListener);//这里设置了消息的消费者 listenerContainer.setPrefetchCount(5); return listenerContainer; }
3. 消息发送常用参数
@Component
@Slf4j
public class CompileCodeSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(Long recordId) {
rabbitTemplate.convertAndSend(CompileCodeEnum.PYTHON_CODE.getExchange(), CompileCodeEnum.PYTHON_CODE.getRouteKey(), recordId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
String content = new String(message.getBody());
System.out.println("消息内容:"+content);
MessageProperties messageProperties = message.getMessageProperties();
//设置持久化
messageProperties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);
//优先级
messageProperties.setPriority(2);
//过期时间,默认设置为一天
final int expireTime =3600000;
messageProperties.setExpiration(String.valueOf(expireTime));
return message;
}
});
}
}
4. 发布确认机制
交换机/队列确认
介绍
-
生产者发送消息到Broker(交换机)
生产者到 Broker有一个ConfirmCallback确认模式,当消息被Broker接收到就会触发ConfirmCallback回调,因此,通过此回调函数就可以知道有没有到达Broker。
-
Broker上的交换机是否成功将消息投放到队列
消息从交换机到 队列 投递失败有一个ReturnCallback回退模式。
注意:在RedisTemplate中可以通过setMandatory(boolean mandatory)
方法或者在yml配置文件中通过template.mandatory: true
来配置当消息没能路由到指定队列时消息是重回生产者还是丢弃,当参数mandatory=false表示消息会被丢弃,当mandatory=true消息会返回给生产者,返回的消息可以从ReturnCallback回调中获取。
使用这两个消息回调,还需要在application.yaml中配置是否开启回调:
publisher-confirm-type:
表示确认消息的类型,分别有none、correlated、simple这三种类型。
- publisher-confirm-type: none:表示禁用发布确认模式,默认值,使用此模式之后,不管消息有没有发送到Broker都不会触发ConfirmCallback回调。
- publisher-confirm-type: correlated:表示消息成功到达Broker后触发ConfirmCalllBack回调
- publisher-confirm-type: simple:simple模式下如果消息成功到达Broker后一样会触发ConfirmCalllBack回调,发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,注意:waitForConfirmsOrDie方法如果返回false则会关闭channel信道,则接下来无法发送消息到broker。
publisher-returns: true
,true表示开启失败回调,开启后当消息无法路由到指定队列时会触发ReturnCallback回调。
代码实现
yaml
# spring配置
spring:
rabbitmq:
host: xx.xx.91.15
port: 32116
username: coder
password: train@coder
virtual-host: compile-code
publisher-confirm-type: correlated # ConfirmCalllBack回调
publisher-returns: true # ReturnCallback回调
template:
mandatory: true
config
@Component
@Slf4j
public class CompileCodeCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//将创建的消息接收的回调对象添加到rabbitTemplate中。
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
//设置当消息灭有路由到指定队列时的处理方案。 //true:重回生产者 //false:丢失(默认) 返回的消息可以从ReturnCallback回调中获取
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 交换机确定是否收到消息的回调方法
* 1.发消息 交换机成功接受到了 回调
* 1.1CorrelationData保存回调消息的ID及相关信息
* 1.2交换机收到消息 ack:true
* 1.3cause 失败的原因 cause:null
* 2.发消息 交换机没有成功接收 回调
* 2.1CorrelationData保存回调消息的ID及相关信息
* 2.2交换机收到消息 ack:false
* 2.3 cause:失败的原因
*/
//消息未到达交换机
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
Message message = correlationData.getReturned().getMessage();
try {
System.out.println(new String(message.getBody(),"utf-8"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
System.out.println(correlationData);
String id = correlationData == null ? "" : correlationData.getId();
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
}
}
//消息到达交换机,但是没有对应的队列。
@Override
public void returnedMessage(ReturnedMessage returned) {
Message message = returned.getMessage();
String replyText = returned.getReplyText();
String routingKey = returned.getRoutingKey();
String exchange = returned.getExchange();
log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}", new
String(message.getBody()), exchange, replyText, routingKey);
}
}
消费者确认
-
消费者在订阅队列时,可以指定 autoAck 参数
- 当 autoAck 等于 false时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。
- 当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息
- 只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。
-
rabbimq消息重新入队条件
- rabbitmq不会设置未确认消息的过期时间,判断消息是否需要重新投递给消费的唯一依据是消费该消息的消费者链接是否已经断开,这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。
- 如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,也有可能还是之前的消费者。
-
队列中的消息分成了两个部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。
yaml配置
自动确认:acknowledge=“none” :当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。如果代码执行过程中出现异常,会造成消息丢失
手动确认:acknowledge=“manual” :设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
根据异常情况确认:acknowledge=“auto”:如果代码在执行过程中出现异常,则自动提交。
spring:
rabbitmq:
host: xx.xx.91.15
port: 32116
username: coder
password: train@coder
virtual-host: compile-code
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple: # 容器类型 简单模式
acknowledge-mode: manual # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto manual为手动
direct: #
acknowledge-mode: manual
消费者配置
channel basicAck(long deliveryTag, boolean multiple);
deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
void basicNack(long deliveryTag, boolean multiple, boolean requeue);
deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列
channel.basicReject(long deliveryTag, boolean requeue);
deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列
channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息
package com.train.algorithm.rabbitmq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class CompileCodeCustomer{
@RabbitListener(queues = "compile.python", containerFactory = "simpleRabbitListenerContainerFactory")
public void handlePythonCode(Message message, Channel channel) {
//消息的唯一标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.获取消息体
String content = new String(message.getBody(),"UTF-8");
//2.业务逻辑
//................
System.out.println(content);
int a = 1/0;
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
try {
//批量拒绝签收 deliveryTag:消息唯一id multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。 requeue:被拒绝的是否重新入队列
channel.basicNack(deliveryTag,true,true);
}catch (Exception exception){
log.info("消息确认异常");
}
}
}
}
注意:此处模拟了出现异常的情况,当出现异常时,拒绝接收消息,将消息重新打回队列。
此时会有一个问题,重新进入队列的消息,还会继续被投放给消费者进行消费,会一直消费出错,导致该消息被循环的消费拒绝。
分析:此处循环的主要原因是因为消息被拒绝入队,进入的是队头,因此下一次消费的还是该消息,一直出错。
配置重置次数,和重试时间,即消息消费失败后,隔一段时间再进行消费,最多重试次数为3次。(有问题)
设置重试次数
# spring配置
spring:
rabbitmq:
host: xx.xx.91.15
port: 32116
username: coder
password: train@coder
virtual-host: compile-code
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true # 开启消费者进行重试
max-attempts: 5 # 最大重试次数
initial-interval: 3000 # 重试时间间隔
direct:
acknowledge-mode: manual
retry:
enabled: true # 开启消费者进行重试
max-attempts: 5 # 最大重试次数
initial-interval: 3000 # 重试时间间隔 3s
此时出现了一个问题,如果代码出现错误,只是捕捉了异常,并没有异常抛出,那么相当于该消息没有被确认,也没有被拒绝,队列会一直发送该消息,那么会一直消费此消息,控制台一直打印该消息的消费信息。
自动确认和手动确认不同情况测试
手动确认
情况一:确认前有异常抛出,捕获后,不拒绝消息,不抛出异常
yaml配置:
# spring配置
spring:
rabbitmq:
host: xx.xx.91.15
port: 32116
username: coder
password: train@coder
virtual-host: compile-code
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple:
concurrency: 1
max-concurrency: 20
prefetch: 50
default-requeue-rejected: false # 注意,此处设置的是false
acknowledge-mode: manual
# retry:
# enabled: true
# max-attempts: 10
# max-interval: 2000ms
# initial-interval: 1000ms
# multiplier: 1
direct:
concurrency: 1
max-concurrency: 20
prefetch: 50
acknowledge-mode: manual
default-requeue-rejected: false # 注意,此处设置的是false
# retry:
# enabled: true
# max-attempts: 10
# max-interval: 2000ms
# initial-interval: 1000ms
# multiplier: 1
default-requeue-rejected: false # 注意,此处设置的是false 表示被拒绝的消息不重新入队。
消费者:
发送消息入队:http://localhost:9208/alg/send/20
,一共发了20条消息,均产生异常
效果:控制台打印一次信息,队列中有20条未确认的消息。
分析:消息消息后,如果一直没有消费,那么消费者会一直消费此消息,直到断开与消费者的链接或确认消息。
直接终止后台服务器:(模拟消费者断开链接)
消息状态由Unacked变为Ready,说明此时消息重新入队。
启动后台服务器:(模拟有消费者)
服务器启动成功后,20条消息会重新入队列进行消费,但是因为有异常产生,因此还是Unacked状态。
总结:
对于手动确认来说,发出的消息必须要给出明确的回复,要么确认,要么拒绝。具体拒绝后的业务可以自定义。但是如果即没有确认,也没有拒绝,那么消息会一直等待消费,如果此时断开链接,不管怎么样消息都会重新入队,等有消费者的时候继续消费。
注意: default-requeue-rejected: false 我设置的是fasle,这里不管设置false还是true都是一样,原因在于我没有主动的拒绝消息。拒绝消息的情况在之后展示。
情况二:确认前有异常抛出,捕获后,不拒绝消息,抛出异常
与情况一相比,就在消费者扔出了异常
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-crBng0sF-1657963658180)(C:\Users\DELL\AppData\Roaming\Typora\typora-user-images\image-20220714110055648.png)]
结果:不扔出异常效果一样。
情况三:确认前有异常抛出,捕获后,拒绝消息,不重回队列
其余配置和之前一样。
这样,消息就直接被拒绝,被拒绝的消息拒绝重新入队。此时消息就被移除了。
分析:如果同意消息重新入队,那么消息会直接放入消息头,下次还是消费这条消息,死循环。
效果:
问题:此处的requeue和default-requeue-rejected: false的关系
- 情况一:default-requeue-rejected: false,requeue:true 消息被拒绝后重新入队
- 情况二:default-requeue-rejected: true,requeue:false 消息拒绝后不会重新入队
- 总结:requeue的优先级要大于default-requeue-rejected的优先级,更准确的说,requeue和default-requeue-rejected没有太大的关系。
情况四:添加重试机制
# spring配置
spring:
rabbitmq:
host: xx.xx.91.15
port: 32116
username: coder
password: train@coder
virtual-host: compile-code
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true # 开启消费者进行重试
max-attempts: 5 # 最大重试次数
initial-interval: 10000000ms # 重试时间间隔
direct:
acknowledge-mode: manual
retry:
enabled: true # 开启消费者进行重试
max-attempts: 5 # 最大重试次数
initial-interval: 10000000ms # 重试时间间隔
重试机制表示消息被拒绝后,重新消费消息的策略。(分析:手动ack时,表示该消息的业务逻辑出错,理应进行其他处理,而不应重新返回队列,这里只是做一次展示)
如果设置重试后,过了重试次数之后,扔未成功处理消息,可以拒绝消息放入死信队列。
消费者
其他配置完全一样。
发现,在手动ack模式下,重试机制没有效果,并没有间隔和次数的限制,拒绝的消息一直重发。
总结
- 综上,手动ack时,yaml中default-requeue-rejected的配置以及retry机制不起作用。
- 手动ack时,拒绝消息,说明这条消息的业务逻辑有问题,需要进行其他处理,而非重新入队。
- 在消费者端一定要进行ack,或者是nack,可以放在try方法块的finally中执行
- 可以对消费者的异常状态进行捕捉,根据异常类型选择ack,或者nack抛弃消息,nack再次尝试
- 对于nack的再次尝试,是进入到队列头的,如果一直是失败的状态,将会造成阻塞。所以最好是专门投递到“死信队列”,具体分析。
自动确认
情况一:确认前有异常抛出,捕获后,不抛出异常
yaml配置
# spring配置
spring:
rabbitmq:
host: xx.xx.91.15
port: 32116
username: coder
password: train@coder
virtual-host: compile-code
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple:
concurrency: 1
max-concurrency: 20
prefetch: 50
acknowledge-mode: manual
# default-requeue-rejected: true
# retry:
# enabled: true
# max-attempts: 10
# max-interval: 2000000ms
# initial-interval: 1000000ms
# multiplier: 1
direct:
concurrency: 1
max-concurrency: 20
prefetch: 50
acknowledge-mode: auto
# default-requeue-rejected: true
# retry:
# enabled: true
# max-attempts: 10
# max-interval: 2000000ms
# initial-interval: 1000000ms
# multiplier: 1
消费者
结果:
队列中无消息,消息被确认。
情况二:确认前有异常抛出,捕获后,抛出异常
注意:这里的异常有三种。
-
当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false(不重新入队列)
-
当抛出ImmediateAcknowledgeAmqpException异常,则消息会被确认(消息未被接收到)
-
immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;
如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
-
Immediate表示该队列无消费者,消息不会入队列,返回个消费者。
-
-
其他的异常,则消息会被拒绝,且requeue=true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过setDefaultRequeueRejected(默认是true)来设置拒绝消息重新入队。
- 如果有死信队列,消息会进入死信队列
- 如果有重试次数,重试后仍无法消费,进入死信队列
此时会抛出异常,并且消息会返回队列队头,队头继续消费错误消息,导致程序死循环。
默认情况下default-requeue-rejected=true,可能是这个原因,导致消息一直重回队列,设置成false查看状态。
测试后,发现并不是这样,false之后,消息被消费一次之后,不再重新入队。消息丢失,控制台不再打印内容。
情况三:确认前有异常抛出,捕获后,开启重试机制
- default-requeue-rejected: false 开启重试
- 相当于发生异常后,拒绝消息重新入队,但是开启失败消息重试机制
# spring配置
spring:
rabbitmq:
host: xx.xx.91.15
port: 32116
username: coder
password: train@coder
virtual-host: compile-code
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple:
concurrency: 5
max-concurrency: 20
prefetch: 50
acknowledge-mode: auto
default-requeue-rejected: false
retry:
enabled: true
max-attempts: 10
max-interval: 10000ms
initial-interval: 2000ms
multiplier: 1
失败后,每隔两秒,尝试重复消费,重试了十次。消息还未成功消息,消息会被确认掉,消息不再重新入队。
尝试在重试过程中,模拟于消费者断开链接,查看数据是否重回队列。
结果消息重回了队列
设置:default-requeue-rejected: true
效果还和false一样,重试达到次数后消息不会再入队。
源码分析:
当消费者中有异常抛出时,rabbitmq会尝试判断该消费能否再尝试消费,如果可以,就继续尝试消费,如果不成功,则丢失该消息。
大坑:yaml与配置类配置
在rabbitmq的配置类中,我配置了SimpleRabbitListenerContainerFactory,没有设置他的default-requeue-rejected,默认为true。但是我在yaml中设置了default-requeue-rejected为false,发现配置类中的设置覆盖掉了yaml中的配置,导致我在yaml中配置default-requeue-rejected=false无效,消息一直重发。
配置文件配置
# spring配置
spring:
rabbitmq:
host: xx.xx.91.15
port: 32116
username: coder
password: train@coder
virtual-host: compile-code
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple:
concurrency: 1
max-concurrency: 20
prefetch: 50
acknowledge-mode: auto
default-requeue-rejected: false # 注意,此处是false
消费者处debug配置类详情
这样配置后,我在yaml中的配置都不会生效了。
5. 失败的消息入死信队列
过期的消息,拒绝后不再入队的消息,超过重试次数的消息,都会进入死信队列。
步骤:
- 创建死信队列,死信交换机,并让普通队列绑定到死信交换机
- 创建死信消费者
//普通队列,普通队列绑定死信交换机
@Bean
public Queue CompileCodeQueue() {
Map<String, Object> args = new HashMap<>(2);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", CompileCodeConfig.COMPILE_CODE_DEAD_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", CompileCodeConfig.COMPILE_CODE_DEAD_KEY);
//druable 持久化 后面输入队列的名称
return QueueBuilder
.durable(CompileCodeConfig.COMPILE_CODE_QUEUE)
.withArguments(args).build();
}
//死信交换机
@Bean("compileCodeDeadDirect")
public DirectExchange compileCodeDeadDirect() {
System.out.println("创建交换机");
return (DirectExchange) ExchangeBuilder
.directExchange(CompileCodeConfig.COMPILE_CODE_DEAD_EXCHANGE)
.durable(true)
.build();
}
//死信队列
@Bean("compileCodeDeadQueue")
public Queue compileCodeDeadQueue() {
return new Queue(CompileCodeConfig.COMPILE_CODE_DEAD_QUEUE);
}
//死信队列与死信交换机进行绑定
@Bean
public Binding bindingExamProcessQueueToExchange(@Qualifier("compileCodeDeadQueue") Queue queue, @Qualifier("compileCodeDeadDirect") DirectExchange customExchange) {
return BindingBuilder
.bind(queue)
.to(customExchange)
.with(CompileCodeConfig.COMPILE_CODE_DEAD_KEY);
}
配置消费者
//死信队列,存储过期或者消费失败的消息
@RabbitListener(queues = CompileCodeConfig.COMPILE_CODE_DEAD_QUEUE)
public void handleExamProcessComputed(Message message, Channel channel) {
log.info("进入死信队列, recordId:{}");
}
知识点
消息发送
基本配置
# spring配置
spring:
rabbitmq:
host: xx.xx.91.15
port: 32116
username: coder
password: train@coder
virtual-host: compile-code
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true # 开启消费者进行重试
max-attempts: 5 # 最大重试次数
initial-interval: 3000 # 重试时间间隔
direct:
acknowledge-mode: manual
retry:
enabled: true # 开启消费者进行重试
max-attempts: 5 # 最大重试次数
initial-interval: 3000 # 重试时间间隔
消息消费
消息分发:prefetchCount
当 RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。
默认情况下,使用轮询的方式发送,有n个消费者,会将第m条消息发送给第m%n个消费者。此时会遇到一个问题就是,如果某些消息处理特别慢,某些消息处理特别慢,那么会造成某些消费者压力较大,等待消费的消息较多,而部分消费者压力较小,进程空闲,造成整体应用吞吐量下降。
使用prefetchCount设置消费者所能保持最大未确认消费的数量,如果消费者未确认的消息达到该数量,那么消息将不再给这个消费者发送,转发给其他消费者,等压力大的消费者确认消费了某条消息后,对应的prefetchCount-1,继续接收消息。
消息分发使用的是chanel,当设置prefetchCount为15时,会生成15个channel。
问题
1 . direct模式下,向不存在的交换机发送消息,消息去了哪里?
2. 在一个消费者的情况下,消费者是一次消费一条还是一次消费多条?
默认情况下,消费者一次消费一条。
生产者:生产10000条数据到队列
消费者:假设处理消息的时间为10s
日志:
消息:
消费:每条消费相差10s消费
3. Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=
设置为手动确认,在yaml配置后,初始化bean时默认的自动确认会覆盖掉yaml中的配置,收到消息时,会先自动确认一次,再手动确认。手动确认时,此时chanel已经关闭,会出现错误日志。
/**
* 个人理解为SimpleMessageListenerContainer就是一个消费者的容器配置类
* @return
*/
@Bean(name="simpleMessageListenerContainer")
public SimpleMessageListenerContainer productMessageListenerContainerFactory(){
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置为手动确认,在yaml配置后,初始化bean时默认的自动确认会覆盖掉yaml中的配置,收到消息时,会先自动确认一次,再手动确认。手动确认时,此时chanel已经关闭,会出现错误日志。
listenerContainer.setConcurrentConsumers(10);
listenerContainer.setMaxConcurrentConsumers(20);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
listenerContainer.setQueues(productQueue());
listenerContainer.setMessageListener(productConsumerListener);//这里设置了消息的消费者
listenerContainer.setPrefetchCount(5);
return listenerContainer;
}
4.rabbitmq的重试机制,并没有按照配置走(参照实战中的发布确认机制)
rabbitmq配置:
# spring配置
spring:
rabbitmq:
host: xx.xx.91.15
port: 32116
username: coder
password: train@coder
virtual-host: compile-code
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple:
acknowledge-mode: auto
retry:
enabled: true
max-attempts: 10
max-interval: 2000ms
initial-interval: 1000ms
multiplier: 1
default-requeue-rejected: false
direct:
acknowledge-mode: auto # 发布确认机制 自动
retry:
enabled: true
max-attempts: 10 # 最大的重试次数
max-interval: 2000ms # 最大的重试间隔
initial-interval: 1000ms # 重试间隔
multiplier: 1 # 重试时间比
default-requeue-rejected: false # 消息被拒绝后,重新放入队列
5. 乱码问题
消费者接收消息乱码,即使使用utf-8转化后。
解决,发送消息时,使用JSON将对象序列化。
发送消息
public void sendMessage(TaskRecord taskRecord) {
rabbitTemplate.convertAndSend(CompileCodeConfig.COMPILE_CODE_EXCHANGE, CompileCodeConfig.COMPILE_CODE_ROUTEKEY, JSON.toJSONString(taskRecord), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
//设置持久化
messageProperties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);
//优先级
messageProperties.setPriority(2);
//过期时间,默认设置为一天
final int expireTime =3600000;
messageProperties.setExpiration(String.valueOf(expireTime));
return message;
}
},new CorrelationData(UUID.randomUUID().toString()));
}
接收消息:
String str = new String(body,"UTF-8");
log.info("收到消息:{}"+str);
参考文章:
(115条消息) RabbitMQ消息确定机制(自动ACK和手动ACK)_小胖学编程的博客-CSDN博客_rabbitmq手动ack
(112条消息) RabbitMQ 消费者确认auto 和 manual 模式对异常的处理区别(含重试、requeue的影响)_DatDreamer的博客-CSDN博客_requeue
【RabbitMQ-6】MQ中间件-rabbitmq-消费者消息获取及异常处理的实现(SpringBoot2.0环境下) - 简书 (jianshu.com)
更多推荐
所有评论(0)