K8s安装RabbitMq

文章目录


rabbitmq属于有状态的服务,即每个服务上存储的内容都不一样,对于有状态的服务,k8s推荐我们使用StatefulSet控制器。

rabbitmq中的部分信息需要持久化,持久化内容使用nfs进行存储,并使用storageclass动态分配pv。

教程

部署教程

参考文章:K8S部署RabbitMQ集群 (镜像模式) - 部署笔记 - 腾讯云开发者社区-腾讯云 (tencent.com)

rabbitmq教程

RabbitMQ 基础核心配置文件介绍丨慕课网教程 (imooc.com)

安装步骤:

1. NFS安装以及storageclass创建

参考文章:[mystudy/大数据/k8s/8. 存储与配置.md · Zhang-HaoQi/Knowledge - 码云 - 开源中国 (gitee.com)](https://gitee.com/zhang-haoqi/knowledge/blob/develop/mystudy/大数据/k8s/8. 存储与配置.md#1-安装nfs)

2. 创建storageclass

需要先为NFS安装存储分配器,再创建NFS存储分配器。

1. 安装NFS存储分配器
第一步:设置存储分配器的权限
  1. 创建nfs-client-provisioner-authority.yaml文件
# nfs-client-provisioner-authority.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: nfs-client-provisioner
  # replace with namespace where provisioner is deployed
  namespace: default
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: nfs-client-provisioner-runner
rules:
  - apiGroups: [""]
    resources: ["persistentvolumes"]
    verbs: ["get", "list", "watch", "create", "delete"]
  - apiGroups: [""]
    resources: ["persistentvolumeclaims"]
    verbs: ["get", "list", "watch", "update"]
  - apiGroups: ["storage.k8s.io"]
    resources: ["storageclasses"]
    verbs: ["get", "list", "watch"]
  - apiGroups: [""]
    resources: ["events"]
    verbs: ["create", "update", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: run-nfs-client-provisioner
subjects:
  - kind: ServiceAccount
    name: nfs-client-provisioner
    # replace with namespace where provisioner is deployed
    namespace: default
roleRef:
  kind: ClusterRole
  name: nfs-client-provisioner-runner
  apiGroup: rbac.authorization.k8s.io
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: leader-locking-nfs-client-provisioner
  # replace with namespace where provisioner is deployed
  namespace: default
rules:
  - apiGroups: [""]
    resources: ["endpoints"]
    verbs: ["get", "list", "watch", "create", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: leader-locking-nfs-client-provisioner
  # replace with namespace where provisioner is deployed
  namespace: default
subjects:
  - kind: ServiceAccount
    name: nfs-client-provisioner
    # replace with namespace where provisioner is deployed
    namespace: default
roleRef:
  kind: Role
  name: leader-locking-nfs-client-provisioner
  apiGroup: rbac.authorization.k8s.io
  1. kubectl apply -f nfs-client-provisioner-authority.yaml
第二步:安装NFS存储分配器
  1. 下载NFS存储分配器:https://raw.githubusercontent.com/Kubernetes-incubator/external-storage/master/nfs-client/deploy/deployment.yaml

    1. 是一个yaml文件,可以浏览器打开直接复制

    2. 修改相关的卷信息 文件名:nfs-client-provisioner.yaml

    3. apiVersion: apps/v1
      kind: Deployment
      metadata:
        name: nfs-client-provisioner
        labels:
          app: nfs-client-provisioner
        # replace with namespace where provisioner is deployed
        namespace: default
      spec:
        replicas: 1
        strategy:
          type: Recreate
        selector:
          matchLabels:
            app: nfs-client-provisioner
        template:
          metadata:
            labels:
              app: nfs-client-provisioner
          spec:
            serviceAccountName: nfs-client-provisioner
            containers:
              - name: nfs-client-provisioner
                image: quay.io/external_storage/nfs-client-provisioner:latest
                volumeMounts:
                  - name: nfs-client-root
                    mountPath: /persistentvolumes
                env:
                  - name: PROVISIONER_NAME  # 存储分配器的默认名称
                    value: fuseim.pri/ifs  
                  - name: NFS_SERVER    # NFS服务器地址
                    value: xx.xx.236.113
                  - name: NFS_PATH       # NFS共享目录地址
                    value: /data/k8snfs
            volumes:
              - name: nfs-client-root
                nfs:
                  server: xx.xx.236.113  # NFS服务器地址
                  path: /data/k8snfs     # NFS共享目录
      
      
  2. kubectl apply -f nfs-client-provisioner.yaml

  3. 查看创建的pod:image-20220628101650408

第三步: 创建NFS存储分配器
  1. 创建nfs-storage-class.yml

    apiVersion: storage.k8s.io/v1
    kind: StorageClass
    metadata:
      name: rabbitmq-nfs-storage
      namespace: rabbit-mq
    provisioner: fuseim.pri/ifs
    reclaimPolicy: Retain
    allowVolumeExpansion: True
    
  2. 执行文件: kubectl apply -f nfs-storage-class.yml

  3. 查看状态: kubectl get storageclass

    image-20220628102959457

  4. 查看详情:kubectl describe storageclass nfs-storage-class.yml

    image-20220628103148575

  5. StorageClass 创建完成后就可以创建 PVC 了。

2. 创建service
---
apiVersion: v1
kind: Service
metadata:
  name: rabbitmq-management
  namespace: rabbit-mq
  labels:
    app: rabbitmq
spec:
  ports:
  - port: 15672
    name: http
  selector:
    app: rabbitmq
  type: NodePort     # 外界可以访问
---
apiVersion: v1
kind: Service
metadata:
  name: rabbitmq
  namespace: rabbit-mq
  labels:
    app: rabbitmq
spec:
  ports:
  - port: 5672
    name: amqp
  - port: 4369
    name: epmd
  - port: 25672
    name: rabbitmq-dist
  clusterIP: None      # 无头service,外界不可以访问,只能由pod内部访问
  selector:
    app: rabbitmq
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  namespace: rabbit-mq
  name: rabbitmq
spec:
  serviceName: "rabbitmq"
  replicas: 1  # 设置节点数量为1(磁盘有限)
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values:
                      - rabbitmq
              topologyKey: "kubernetes.io/hostname"
      containers:
      - name: rabbitmq
        image: rabbitmq:3.7-rc-management
        lifecycle:
          postStart:
            exec:
              command:
              - /bin/sh
              - -c
              - >
                if [ -z "$(grep rabbitmq /etc/resolv.conf)" ]; then
                  sed "s/^search \([^ ]\+\)/search rabbitmq.\1 \1/" /etc/resolv.conf > /etc/resolv.conf.new;
                  cat /etc/resolv.conf.new > /etc/resolv.conf;
                  rm /etc/resolv.conf.new;
                fi;
                until rabbitmqctl node_health_check; do sleep 1; done;
                if [ -z "$(rabbitmqctl cluster_status | grep rabbitmq-0)" ]; then
                  touch /gotit
                  rabbitmqctl stop_app;
                  rabbitmqctl reset;
                  rabbitmqctl join_cluster rabbit@rabbitmq-0;
                  rabbitmqctl start_app;
                else
                  touch /notget
                fi;
        env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: RABBITMQ_ERLANG_COOKIE
          value: "YZSDHWMFSMKEMBDHSGGZ"
        - name: RABBITMQ_NODENAME
          value: "rabbit@$(MY_POD_NAME)"
        ports:
        - name: http
          protocol: TCP
          containerPort: 15672
        - name: amqp
          protocol: TCP
          containerPort: 5672
        livenessProbe:
          tcpSocket:
            port: amqp
          initialDelaySeconds: 5
          timeoutSeconds: 5
          periodSeconds: 10
        readinessProbe:
          tcpSocket:
            port: amqp
          initialDelaySeconds: 15
          timeoutSeconds: 5
          periodSeconds: 20
        volumeMounts:
        - name: rabbitmq-data
          mountPath: /var/lib/rabbitmq
  volumeClaimTemplates:
  - metadata:
      name: rabbitmq-data
      annotations:
        volume.beta.kubernetes.io/storage-class: "rabbitmq-nfs-storage"
    spec:
      accessModes:
        - ReadWriteMany
      resources:
        requests:
          storage: 10Gi

应用yaml:kubectl apply-f rabbitmq.yaml -n rabbit-mq

image-20220712104824825

控制台对外暴露的端口是31719

image-20220712105002906

访问控制台:有一个mq节点。因为空间有限,因此只设置了一个节点。

image-20220712105031437

查看nfs存储的配置文件:

image-20220712105425884

3. 使用rabbitmq服务

  1. 进入服务:kubectl exec -it rabbitmq-0 -n rabbit-mq /bin/bash

  2. 虚拟主机操作vhost:

    1. vhost 是 AMQP 概念的基础,客户端在连接的时候必须制定一个 vhost。RabbitMQ 默认创建的 vhost 为“/”,

      vhost类似服务中的一台虚拟主机,提供逻辑隔离,不同的业务使用mq时最好创建不同的vhost,对业务进行归类划分

    2. 查看虚拟主机vhost:rabbitmqctl list_vhosts

    image-20220712112114314

    1. 创建虚拟主机vhost:rabbitmqctl add_vhost compile-code

    2. 删除虚拟主机vhost:rabbitmqctl delete_vhost {vhost}

      删除一个 vhost 同时也会删除其下所有的队列、交换器、绑定关系、用户权限、参数和策略等信息

    3. 查看主机是否使用了 RabbitMQ 的 trace 功能 rabbitmqctl list_vhosts name tracing

      image-20220712112948522

    4. 在 RabbitMQ 中,权限控制则是以 vhost 为单位的。当创建一个用户时,用户通常会被指派给至少一个 vhost,并且只能访问被指派的 vhost 内的队列,交换器和绑定关系等。

      1. rabbitmqctl set_permissions [-p vhost] {user} {conf}{write}{read}

      2. image-20220712141747035

      3. 可配置指的是队列和交换器的创建及删除之类的操作;可写指的是发布消息;可读指与消息有关的操作,包括读取消息及清空整个队列等。

      4. 赋予用户权限

        1. 查看用户:rabbitmqctl list_users

          image-20220712144704065

        2. 创建用户:rabbitmqctl add_user coder train@coder

        3. 创建coder用户并赋予compile-coding所有权限: rabbitmqctl set_permissions -p compile-code coder ".*" ".*" ".*"

        4. 清除权限:rabbitmqctl clear_permissions -p compile-code coder

        5. 显示宿主机权限:rabbitmqctl list_permissions -p compile-code

          image-20220712142944277

        6. 显示用户权限:rabbitmqctl list_user_permissions coder

          image-20220712143135145

        7. 使用coder用户登录web控制台,发现登录不进去。原因:没有给用户赋予角色

          1. 用户的角色分为 5 种类型。

            image-20220712144752791

          2. 设置角色类型为management:rabbitmqctl set_user_tags coder management

            image-20220712150142649什么都没有

          3. 设置角色类型为monitoring:rabbitmqctl set_user_tags coder monitoring

          4. 经过测试,management,poliymaker,monitoring能进去web页面,但是什么内容也没有,none和administartor登录不进去。

4. 将 headless service 映射到外网

我们使用的是StatefulSet控制器创建的无头的rabbitmq服务,服务地址是不暴露给外界的,只能在pod中访问到rabbitmq服务。这样很不利于我们开发测试。

查看服务信息

image-20220712171313284

将 rabbitmq 通过负载均衡映射到外网

kubectl expose service rabbitmq -n rabbit-mq --type=LoadBalancer --name=rabbitmq-external-lb

image-20220712171449301

此时就可以进行链接测试,如果不负载均衡到外网,mq服务是一直链接不上的。

SpringBoot测试

引入依赖

        <!--消息队列-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
# spring配置
spring: 
  rabbitmq:
    host: XX.XX.91.15
    port: 32116
    username: coder
    password: train@coder
    virtual-host: compile-code

测试代码

我们往key为hello的交换机中放一条hello world的消息。

注意:需要先创建交换机和队列。

交换机

image-20220712170840646

队列:注意,要与交换机绑定

image-20220712170851360

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class MqTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testHello(){
//        System.out.println(123);
        System.out.println(rabbitTemplate);
      rabbitTemplate.convertAndSend("hello","hello world");

    }
}

image-20220712172123949

rabbitmq实战

image-20220712194719128

1. 配置

springboot配置文件:(112条消息) springboot整合rabbitmq的配置文件详解_xiaoweiwei99的博客-CSDN博客_rabbitmq配置springboot

  1. 创建枚举,列取需要的交换机,队列,路由key

    @Getter
    public enum CompileCodeEnum {
        /**
         * 在线编程
         */
        PYTHON_CODE("compile.python.direct", "compile.python", "compile.python"),
        R_CODE("compile.r.direct", "compile.r", "compile.r"),
        SCALA_CODE("compile.scala.direct", "compile.scala", "compile.scala");
    
    
        /**
         * 交换名称
         */
        private String exchange;
        /**
         * 队列名称
         */
        private final String name;
        /**
         * 路由键
         */
        private String routeKey;
    
        CompileCodeEnum(String exchange,String name,String routeKey) {
            this.exchange = exchange;
            this.name = name;
            this.routeKey = routeKey;
        }
    }
    
  2. 创建交换机,创建队列,交换机通过routingkey与队列绑定

    @Configuration
    public class CompileCodeConfig {
        //springboot 会为我们自动初始化这个CachingConnectionFactory
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
        @Bean
        public DirectExchange examComputedExchange() {
    //        Map<String, Object> arguments = new HashMap<>();
    //        //设置自定义交换机的类型。
    //        arguments.put("x-delayed-type", "direct");
            //1.交换机名称
            //2.交换机的类型
            //3.是否需要持久化
            //4.是否需要自动删除
            //5.其他参数
            return ExchangeBuilder.directExchange(CompileCodeEnum.PYTHON_CODE.getExchange()).durable(true).build();
        }
    
        @Bean
        public Queue examComputedQueue() {
            return QueueBuilder
                    .durable(CompileCodeEnum.PYTHON_CODE.getName())
                    .build();
        }
    
        @Bean
        public Binding bindingExamComputedQueueToExchange() {
            return BindingBuilder
                    .bind(examComputedQueue())
                    .to(examComputedExchange())
                    .with(CompileCodeEnum.PYTHON_CODE.getRouteKey());
        }
    }
    
  3. 消息发送到交换机,发消息需要指定交换机和routingkey,说明消息进入哪个队列

    @Component
    @Slf4j
    public class CompileCodeSender {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        public void sendMessage(Long recordId) {
            rabbitTemplate.convertAndSend(CompileCodeEnum.PYTHON_CODE.getExchange(), CompileCodeEnum.PYTHON_CODE.getRouteKey(), recordId);
        }
    }
    
  4. 消费者选择队列进行监听,消费队列中的消息

    @Component
    @Slf4j
    public class CompileCodeCustomer {
        @RabbitListener(queues = "compile.python")
        public void handleExamProcessComputed(Long recordId) throws InterruptedException {
    
            log.info("进入考试消费队列, id:{}", recordId);
            Thread.sleep(10000);
        }
    }
    
  5. 测试

    消费:每条消费相差10s消费

image-20220713095228620

  1. 问题

    消费者一次只能消费一条消息,效率比较低,希望能够消费多条

2. 配置多个消费者

  1. 创建消费工厂:在CompileCodeConfig中配置

        
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
    	@Bean
        public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){
            SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
            listenerContainerFactory.setConnectionFactory(connectionFactory);
            listenerContainerFactory.setConcurrentConsumers(15); //并发消费者数量
            listenerContainerFactory.setMaxConcurrentConsumers(15);//最大的并发消费者数量
            listenerContainerFactory.setPrefetchCount(10);//预处理消息个数  默认轮询发给各个消费者,但是可能会造成有的消费者任务繁重,来不及消费,而有的消费者可能处理较快,设置prefetchCount,允许限制信道上的消费者所能保持的最大未确认消息的数量,如果达到上限,则不再向此消费者发送。直到消费者消费了消息后,才继续分配。
    //        listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//开启消息确认机制
            return listenerContainerFactory;
        }
    
    
  2. 在消费者的rabbitmqlistener设置工厂

      @RabbitListener(queues = "compile.python", containerFactory = "simpleRabbitListenerContainerFactory")
        public void handleExamProcessComputed(Long recordId) throws InterruptedException {
    
            log.info("进入考试消费队列, id:{}", recordId);
            Thread.sleep(10000);
        }
    
  3. 结果:同时消费多条消息

    image-20220713102620412

  4. 还可以设置工厂

    /**
     * 个人理解为SimpleMessageListenerContainer就是一个消费者的容器配置类
     * @return
     */
    @Bean(name="simpleMessageListenerContainer")
    public SimpleMessageListenerContainer productMessageListenerContainerFactory(){
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory);
        listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置为手动确认,在yaml配置后,初始化bean时默认的自动确认会覆盖掉yaml中的配置,收到消息时,会先自动确认一次,再手动确认。手动确认时,此时chanel已经关闭,会出现错误日志。
        listenerContainer.setConcurrentConsumers(10);
        listenerContainer.setMaxConcurrentConsumers(20);
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        listenerContainer.setQueues(productQueue());
        listenerContainer.setMessageListener(productConsumerListener);//这里设置了消息的消费者
        listenerContainer.setPrefetchCount(5);
        return listenerContainer;
    }
    

3. 消息发送常用参数

@Component
@Slf4j
public class CompileCodeSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void sendMessage(Long recordId) {
        rabbitTemplate.convertAndSend(CompileCodeEnum.PYTHON_CODE.getExchange(), CompileCodeEnum.PYTHON_CODE.getRouteKey(), recordId, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                String content = new String(message.getBody());
                System.out.println("消息内容:"+content);
                MessageProperties messageProperties = message.getMessageProperties();
                //设置持久化
                messageProperties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);
                //优先级
                messageProperties.setPriority(2);
                //过期时间,默认设置为一天
                final int  expireTime =3600000;
                messageProperties.setExpiration(String.valueOf(expireTime));
                return message;
            }
        });
    }

}

4. 发布确认机制

交换机/队列确认
介绍

image-20220713144555095

  1. 生产者发送消息到Broker(交换机)

    生产者到 Broker有一个ConfirmCallback确认模式,当消息被Broker接收到就会触发ConfirmCallback回调,因此,通过此回调函数就可以知道有没有到达Broker。

  2. Broker上的交换机是否成功将消息投放到队列

    消息从交换机到 队列 投递失败有一个ReturnCallback回退模式。

注意:在RedisTemplate中可以通过setMandatory(boolean mandatory)方法或者在yml配置文件中通过template.mandatory: true来配置当消息没能路由到指定队列时消息是重回生产者还是丢弃,当参数mandatory=false表示消息会被丢弃,当mandatory=true消息会返回给生产者,返回的消息可以从ReturnCallback回调中获取。

使用这两个消息回调,还需要在application.yaml中配置是否开启回调:

publisher-confirm-type: 表示确认消息的类型,分别有none、correlated、simple这三种类型。

  • publisher-confirm-type: none:表示禁用发布确认模式,默认值,使用此模式之后,不管消息有没有发送到Broker都不会触发ConfirmCallback回调。
  • publisher-confirm-type: correlated:表示消息成功到达Broker后触发ConfirmCalllBack回调
  • publisher-confirm-type: simple:simple模式下如果消息成功到达Broker后一样会触发ConfirmCalllBack回调,发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,注意:waitForConfirmsOrDie方法如果返回false则会关闭channel信道,则接下来无法发送消息到broker。

publisher-returns: true ,true表示开启失败回调,开启后当消息无法路由到指定队列时会触发ReturnCallback回调。

代码实现

yaml

# spring配置
spring: 
  rabbitmq:
    host: xx.xx.91.15
    port: 32116
    username: coder
    password: train@coder
    virtual-host: compile-code
    publisher-confirm-type: correlated  # ConfirmCalllBack回调
    publisher-returns: true    # ReturnCallback回调
    template:
      mandatory: true

config

@Component
@Slf4j
public class CompileCodeCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;


    //将创建的消息接收的回调对象添加到rabbitTemplate中。
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        //设置当消息灭有路由到指定队列时的处理方案。   //true:重回生产者   //false:丢失(默认) 返回的消息可以从ReturnCallback回调中获取
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 交换机确定是否收到消息的回调方法
     * 1.发消息  交换机成功接受到了  回调
     * 1.1CorrelationData保存回调消息的ID及相关信息
     * 1.2交换机收到消息   ack:true
     * 1.3cause 失败的原因  cause:null
     * 2.发消息  交换机没有成功接收   回调
     * 2.1CorrelationData保存回调消息的ID及相关信息
     * 2.2交换机收到消息   ack:false
     * 2.3 cause:失败的原因
     */

    //消息未到达交换机
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        Message message = correlationData.getReturned().getMessage();
        try {
            System.out.println(new String(message.getBody(),"utf-8"));
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        }
        System.out.println(correlationData);
        String id = correlationData == null ? "" : correlationData.getId();
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }



    //消息到达交换机,但是没有对应的队列。
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        Message message = returned.getMessage();
        String replyText = returned.getReplyText();
        String routingKey = returned.getRoutingKey();
        String exchange = returned.getExchange();
        log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}", new
                String(message.getBody()), exchange, replyText, routingKey);

    }
}
消费者确认
  1. 消费者在订阅队列时,可以指定 autoAck 参数

    1. 当 autoAck 等于 false时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。
    2. 当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息
    3. 只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。
  2. rabbimq消息重新入队条件

    1. rabbitmq不会设置未确认消息的过期时间,判断消息是否需要重新投递给消费的唯一依据是消费该消息的消费者链接是否已经断开,这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。
    2. 如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,也有可能还是之前的消费者。
  3. 队列中的消息分成了两个部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。

yaml配置

自动确认:acknowledge=“none” :当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。如果代码执行过程中出现异常,会造成消息丢失

手动确认:acknowledge=“manual” :设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

根据异常情况确认:acknowledge=“auto”:如果代码在执行过程中出现异常,则自动提交。

spring: 
  rabbitmq:
    host: xx.xx.91.15
    port: 32116
    username: coder
    password: train@coder
    virtual-host: compile-code
    publisher-confirm-type: correlated
    publisher-returns: true    
    template:
      mandatory: true
    listener:
      simple: # 容器类型 简单模式
        acknowledge-mode: manual # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto  manual为手动
      direct: # 
        acknowledge-mode: manual

消费者配置

channel basicAck(long deliveryTag, boolean multiple);
deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
void basicNack(long deliveryTag, boolean multiple, boolean requeue);
deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列
channel.basicReject(long deliveryTag, boolean requeue);
deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列
channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息

package com.train.algorithm.rabbitmq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class CompileCodeCustomer{
    @RabbitListener(queues = "compile.python", containerFactory = "simpleRabbitListenerContainerFactory")
    public void handlePythonCode(Message message, Channel channel) {
        //消息的唯一标识
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //1.获取消息体
            String content = new String(message.getBody(),"UTF-8");
            //2.业务逻辑
            //................
            System.out.println(content);
            int a = 1/0;
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            try {
                //批量拒绝签收   deliveryTag:消息唯一id   multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。    requeue:被拒绝的是否重新入队列
                channel.basicNack(deliveryTag,true,true);
            }catch (Exception exception){
                log.info("消息确认异常");
            }
        }
    }
}

注意:此处模拟了出现异常的情况,当出现异常时,拒绝接收消息,将消息重新打回队列。

此时会有一个问题,重新进入队列的消息,还会继续被投放给消费者进行消费,会一直消费出错,导致该消息被循环的消费拒绝。

分析:此处循环的主要原因是因为消息被拒绝入队,进入的是队头,因此下一次消费的还是该消息,一直出错。

配置重置次数,和重试时间,即消息消费失败后,隔一段时间再进行消费,最多重试次数为3次。(有问题)

设置重试次数
# spring配置
spring: 
  rabbitmq:
    host: xx.xx.91.15
    port: 32116
    username: coder
    password: train@coder
    virtual-host: compile-code
    publisher-confirm-type: correlated
    publisher-returns: true    
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true # 开启消费者进行重试
          max-attempts: 5 # 最大重试次数
          initial-interval: 3000 # 重试时间间隔
      direct:
        acknowledge-mode: manual
        retry:
          enabled: true # 开启消费者进行重试
          max-attempts: 5 # 最大重试次数
          initial-interval: 3000 # 重试时间间隔 3s

此时出现了一个问题,如果代码出现错误,只是捕捉了异常,并没有异常抛出,那么相当于该消息没有被确认,也没有被拒绝,队列会一直发送该消息,那么会一直消费此消息,控制台一直打印该消息的消费信息。

自动确认和手动确认不同情况测试
手动确认
情况一:确认前有异常抛出,捕获后,不拒绝消息,不抛出异常

yaml配置:

# spring配置
spring: 
  rabbitmq:
    host: xx.xx.91.15
    port: 32116
    username: coder
    password: train@coder
    virtual-host: compile-code
    publisher-confirm-type: correlated
    publisher-returns: true    
    template:
      mandatory: true
    listener:
      simple:
        concurrency: 1 
        max-concurrency: 20 
        prefetch: 50
        default-requeue-rejected: false  # 注意,此处设置的是false
        acknowledge-mode: manual
        # retry:
        #   enabled: true 
        #   max-attempts: 10
        #   max-interval: 2000ms
        #   initial-interval: 1000ms
        #   multiplier: 1
      direct:
        concurrency: 1 
        max-concurrency: 20 
        prefetch: 50
        acknowledge-mode: manual
        default-requeue-rejected: false  # 注意,此处设置的是false
        # retry:
        #   enabled: true 
        #   max-attempts: 10
        #   max-interval: 2000ms
        #   initial-interval: 1000ms
        #   multiplier: 1

default-requeue-rejected: false # 注意,此处设置的是false 表示被拒绝的消息不重新入队。

消费者:

image-20220714094706332

发送消息入队:http://localhost:9208/alg/send/20,一共发了20条消息,均产生异常

image-20220714100208591

效果:控制台打印一次信息,队列中有20条未确认的消息。

image-20220714100139974

分析:消息消息后,如果一直没有消费,那么消费者会一直消费此消息,直到断开与消费者的链接或确认消息。

image-20220714100444395

直接终止后台服务器:(模拟消费者断开链接)

image-20220714100735133

消息状态由Unacked变为Ready,说明此时消息重新入队。

启动后台服务器:(模拟有消费者)

服务器启动成功后,20条消息会重新入队列进行消费,但是因为有异常产生,因此还是Unacked状态。

image-20220714100930552

总结:

对于手动确认来说,发出的消息必须要给出明确的回复,要么确认,要么拒绝。具体拒绝后的业务可以自定义。但是如果即没有确认,也没有拒绝,那么消息会一直等待消费,如果此时断开链接,不管怎么样消息都会重新入队,等有消费者的时候继续消费。

注意: default-requeue-rejected: false 我设置的是fasle,这里不管设置false还是true都是一样,原因在于我没有主动的拒绝消息。拒绝消息的情况在之后展示。

情况二:确认前有异常抛出,捕获后,不拒绝消息,抛出异常

与情况一相比,就在消费者扔出了异常

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-crBng0sF-1657963658180)(C:\Users\DELL\AppData\Roaming\Typora\typora-user-images\image-20220714110055648.png)]

结果:不扔出异常效果一样。

情况三:确认前有异常抛出,捕获后,拒绝消息,不重回队列

其余配置和之前一样。

image-20220714111227619

这样,消息就直接被拒绝,被拒绝的消息拒绝重新入队。此时消息就被移除了。

分析:如果同意消息重新入队,那么消息会直接放入消息头,下次还是消费这条消息,死循环。

image-20220714111602845

image-20220714111847112

效果:

问题:此处的requeue和default-requeue-rejected: false的关系

  1. 情况一:default-requeue-rejected: false,requeue:true 消息被拒绝后重新入队
  2. 情况二:default-requeue-rejected: true,requeue:false 消息拒绝后不会重新入队
  3. 总结:requeue的优先级要大于default-requeue-rejected的优先级,更准确的说,requeue和default-requeue-rejected没有太大的关系。
情况四:添加重试机制
# spring配置
spring: 
  rabbitmq:
    host: xx.xx.91.15
    port: 32116
    username: coder
    password: train@coder
    virtual-host: compile-code
    publisher-confirm-type: correlated
    publisher-returns: true    
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true # 开启消费者进行重试
          max-attempts: 5 # 最大重试次数
          initial-interval: 10000000ms # 重试时间间隔
      direct:
        acknowledge-mode: manual
        retry:
          enabled: true # 开启消费者进行重试
          max-attempts: 5 # 最大重试次数
          initial-interval: 10000000ms # 重试时间间隔

重试机制表示消息被拒绝后,重新消费消息的策略。(分析:手动ack时,表示该消息的业务逻辑出错,理应进行其他处理,而不应重新返回队列,这里只是做一次展示)

如果设置重试后,过了重试次数之后,扔未成功处理消息,可以拒绝消息放入死信队列。

image-20220715103626031

消费者

image-20220714113741347

其他配置完全一样。

发现,在手动ack模式下,重试机制没有效果,并没有间隔和次数的限制,拒绝的消息一直重发。

image-20220714113602155

总结
  1. 综上,手动ack时,yaml中default-requeue-rejected的配置以及retry机制不起作用。
  2. 手动ack时,拒绝消息,说明这条消息的业务逻辑有问题,需要进行其他处理,而非重新入队。
  3. 在消费者端一定要进行ack,或者是nack,可以放在try方法块的finally中执行
  4. 可以对消费者的异常状态进行捕捉,根据异常类型选择ack,或者nack抛弃消息,nack再次尝试
  5. 对于nack的再次尝试,是进入到队列头的,如果一直是失败的状态,将会造成阻塞。所以最好是专门投递到“死信队列”,具体分析。
自动确认
情况一:确认前有异常抛出,捕获后,不抛出异常

yaml配置

# spring配置
spring: 
  rabbitmq:
    host: xx.xx.91.15
    port: 32116
    username: coder
    password: train@coder
    virtual-host: compile-code
    publisher-confirm-type: correlated
    publisher-returns: true    
    template:
      mandatory: true
    listener:
      simple:
        concurrency: 1 
        max-concurrency: 20 
        prefetch: 50
        acknowledge-mode: manual
        # default-requeue-rejected: true

        # retry:
        #   enabled: true 
        #   max-attempts: 10
        #   max-interval: 2000000ms
        #   initial-interval: 1000000ms
        #   multiplier: 1
      direct:
        concurrency: 1 
        max-concurrency: 20 
        prefetch: 50
        acknowledge-mode: auto
        # default-requeue-rejected: true
        # retry:
        #   enabled: true 
        #   max-attempts: 10
        #   max-interval: 2000000ms
        #   initial-interval: 1000000ms
        #   multiplier: 1

消费者

image-20220714141522541

结果:

队列中无消息,消息被确认。

image-20220714142428322

image-20220714142447216

情况二:确认前有异常抛出,捕获后,抛出异常

注意:这里的异常有三种。

  1. 当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false(不重新入队列)

  2. 当抛出ImmediateAcknowledgeAmqpException异常,则消息会被确认(消息未被接收到)

    1. immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;

      如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。

    2. Immediate表示该队列无消费者,消息不会入队列,返回个消费者。

  3. 其他的异常,则消息会被拒绝,且requeue=true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过setDefaultRequeueRejected(默认是true)来设置拒绝消息重新入队。

    1. 如果有死信队列,消息会进入死信队列
    2. 如果有重试次数,重试后仍无法消费,进入死信队列

image-20220714142826362

此时会抛出异常,并且消息会返回队列队头,队头继续消费错误消息,导致程序死循环。

image-20220714142810695

默认情况下default-requeue-rejected=true,可能是这个原因,导致消息一直重回队列,设置成false查看状态。

测试后,发现并不是这样,false之后,消息被消费一次之后,不再重新入队。消息丢失,控制台不再打印内容。

image-20220714151717293

情况三:确认前有异常抛出,捕获后,开启重试机制
  1. default-requeue-rejected: false 开启重试
  2. 相当于发生异常后,拒绝消息重新入队,但是开启失败消息重试机制
# spring配置
spring: 
  rabbitmq:
    host: xx.xx.91.15
    port: 32116
    username: coder
    password: train@coder
    virtual-host: compile-code
    publisher-confirm-type: correlated
    publisher-returns: true    
    template:
      mandatory: true
    listener:
      simple:
        concurrency: 5
        max-concurrency: 20 
        prefetch: 50
        acknowledge-mode: auto
        default-requeue-rejected: false
        retry:
          enabled: true 
          max-attempts: 10
          max-interval: 10000ms
          initial-interval: 2000ms
          multiplier: 1

失败后,每隔两秒,尝试重复消费,重试了十次。消息还未成功消息,消息会被确认掉,消息不再重新入队。

image-20220714161115502

image-20220714161316329

尝试在重试过程中,模拟于消费者断开链接,查看数据是否重回队列。
image-20220714162207869

结果消息重回了队列

设置:default-requeue-rejected: true

效果还和false一样,重试达到次数后消息不会再入队。

源码分析:

当消费者中有异常抛出时,rabbitmq会尝试判断该消费能否再尝试消费,如果可以,就继续尝试消费,如果不成功,则丢失该消息。

image-20220714161907765

大坑:yaml与配置类配置

在rabbitmq的配置类中,我配置了SimpleRabbitListenerContainerFactory,没有设置他的default-requeue-rejected,默认为true。但是我在yaml中设置了default-requeue-rejected为false,发现配置类中的设置覆盖掉了yaml中的配置,导致我在yaml中配置default-requeue-rejected=false无效,消息一直重发。

image-20220714152122078

配置文件配置

# spring配置
spring: 
  rabbitmq:
    host: xx.xx.91.15
    port: 32116
    username: coder
    password: train@coder
    virtual-host: compile-code
    publisher-confirm-type: correlated
    publisher-returns: true    
    template:
      mandatory: true
    listener:
      simple:
        concurrency: 1 
        max-concurrency: 20 
        prefetch: 50
        acknowledge-mode: auto
        default-requeue-rejected: false  # 注意,此处是false

消费者处debug配置类详情

image-20220714154225180

这样配置后,我在yaml中的配置都不会生效了。

image-20220714154202222

5. 失败的消息入死信队列

过期的消息,拒绝后不再入队的消息,超过重试次数的消息,都会进入死信队列。

步骤:

  1. 创建死信队列,死信交换机,并让普通队列绑定到死信交换机
  2. 创建死信消费者
    //普通队列,普通队列绑定死信交换机
    @Bean
    public Queue CompileCodeQueue() {
        Map<String, Object> args = new HashMap<>(2);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", CompileCodeConfig.COMPILE_CODE_DEAD_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", CompileCodeConfig.COMPILE_CODE_DEAD_KEY);
        //druable 持久化  后面输入队列的名称
        return QueueBuilder
                .durable(CompileCodeConfig.COMPILE_CODE_QUEUE)
                .withArguments(args).build();
    }

	//死信交换机
    @Bean("compileCodeDeadDirect")
    public DirectExchange compileCodeDeadDirect() {
        System.out.println("创建交换机");
        return (DirectExchange) ExchangeBuilder
                .directExchange(CompileCodeConfig.COMPILE_CODE_DEAD_EXCHANGE)
                .durable(true)
                .build();
    }


    //死信队列
    @Bean("compileCodeDeadQueue")
    public Queue compileCodeDeadQueue() {
        return new Queue(CompileCodeConfig.COMPILE_CODE_DEAD_QUEUE);
    }


    //死信队列与死信交换机进行绑定
    @Bean
    public Binding bindingExamProcessQueueToExchange(@Qualifier("compileCodeDeadQueue") Queue queue, @Qualifier("compileCodeDeadDirect") DirectExchange customExchange) {
        return BindingBuilder
                .bind(queue)
                .to(customExchange)
                .with(CompileCodeConfig.COMPILE_CODE_DEAD_KEY);
    }

配置消费者

    //死信队列,存储过期或者消费失败的消息
    @RabbitListener(queues = CompileCodeConfig.COMPILE_CODE_DEAD_QUEUE)
    public void handleExamProcessComputed(Message message, Channel channel) {

        log.info("进入死信队列, recordId:{}");

    }

image-20220714204318829

知识点

消息发送

基本配置
# spring配置
spring: 
  rabbitmq:
    host: xx.xx.91.15
    port: 32116
    username: coder
    password: train@coder
    virtual-host: compile-code
    publisher-confirm-type: correlated
    publisher-returns: true    
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true # 开启消费者进行重试
          max-attempts: 5 # 最大重试次数
          initial-interval: 3000 # 重试时间间隔
      direct:
        acknowledge-mode: manual
        retry:
          enabled: true # 开启消费者进行重试
          max-attempts: 5 # 最大重试次数
          initial-interval: 3000 # 重试时间间隔

消息消费

消息分发:prefetchCount

当 RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。

默认情况下,使用轮询的方式发送,有n个消费者,会将第m条消息发送给第m%n个消费者。此时会遇到一个问题就是,如果某些消息处理特别慢,某些消息处理特别慢,那么会造成某些消费者压力较大,等待消费的消息较多,而部分消费者压力较小,进程空闲,造成整体应用吞吐量下降。

使用prefetchCount设置消费者所能保持最大未确认消费的数量,如果消费者未确认的消息达到该数量,那么消息将不再给这个消费者发送,转发给其他消费者,等压力大的消费者确认消费了某条消息后,对应的prefetchCount-1,继续接收消息。

消息分发使用的是chanel,当设置prefetchCount为15时,会生成15个channel。

image-20220713152627355

问题

1 . direct模式下,向不存在的交换机发送消息,消息去了哪里?

2. 在一个消费者的情况下,消费者是一次消费一条还是一次消费多条?

默认情况下,消费者一次消费一条。

生产者:生产10000条数据到队列

image-20220713095019977

消费者:假设处理消息的时间为10s

image-20220713095057921

日志:

消息:

image-20220713095152036

消费:每条消费相差10s消费

image-20220713095228620

3. Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=

设置为手动确认,在yaml配置后,初始化bean时默认的自动确认会覆盖掉yaml中的配置,收到消息时,会先自动确认一次,再手动确认。手动确认时,此时chanel已经关闭,会出现错误日志。

/**
 * 个人理解为SimpleMessageListenerContainer就是一个消费者的容器配置类
 * @return
 */
@Bean(name="simpleMessageListenerContainer")
public SimpleMessageListenerContainer productMessageListenerContainerFactory(){
    SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
    listenerContainer.setConnectionFactory(connectionFactory);
    listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置为手动确认,在yaml配置后,初始化bean时默认的自动确认会覆盖掉yaml中的配置,收到消息时,会先自动确认一次,再手动确认。手动确认时,此时chanel已经关闭,会出现错误日志。
    listenerContainer.setConcurrentConsumers(10);
    listenerContainer.setMaxConcurrentConsumers(20);
    listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    listenerContainer.setQueues(productQueue());
    listenerContainer.setMessageListener(productConsumerListener);//这里设置了消息的消费者
    listenerContainer.setPrefetchCount(5);
    return listenerContainer;
}

4.rabbitmq的重试机制,并没有按照配置走(参照实战中的发布确认机制)

rabbitmq配置:

# spring配置
spring: 
  rabbitmq:
    host: xx.xx.91.15
    port: 32116
    username: coder
    password: train@coder
    virtual-host: compile-code
    publisher-confirm-type: correlated
    publisher-returns: true    
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: auto
        retry:
          enabled: true 
          max-attempts: 10
          max-interval: 2000ms
          initial-interval: 1000ms
          multiplier: 1
        default-requeue-rejected: false 
      direct:
        acknowledge-mode: auto   # 发布确认机制   自动
        retry:
          enabled: true 
          max-attempts: 10 # 最大的重试次数
          max-interval: 2000ms # 最大的重试间隔
          initial-interval: 1000ms # 重试间隔
          multiplier: 1 # 重试时间比
        default-requeue-rejected: false # 消息被拒绝后,重新放入队列

5. 乱码问题

消费者接收消息乱码,即使使用utf-8转化后。

解决,发送消息时,使用JSON将对象序列化。

发送消息

    public void sendMessage(TaskRecord taskRecord) {
        rabbitTemplate.convertAndSend(CompileCodeConfig.COMPILE_CODE_EXCHANGE, CompileCodeConfig.COMPILE_CODE_ROUTEKEY, JSON.toJSONString(taskRecord), new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties messageProperties = message.getMessageProperties();
                //设置持久化
                messageProperties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);
                //优先级
                messageProperties.setPriority(2);
                //过期时间,默认设置为一天
                final int  expireTime =3600000;
                messageProperties.setExpiration(String.valueOf(expireTime));
                return message;
            }
        },new CorrelationData(UUID.randomUUID().toString()));
    }

接收消息:

  String str = new String(body,"UTF-8");
 log.info("收到消息:{}"+str);
参考文章:

(115条消息) RabbitMQ消息确定机制(自动ACK和手动ACK)_小胖学编程的博客-CSDN博客_rabbitmq手动ack

(112条消息) RabbitMQ 消费者确认auto 和 manual 模式对异常的处理区别(含重试、requeue的影响)_DatDreamer的博客-CSDN博客_requeue

【RabbitMQ-6】MQ中间件-rabbitmq-消费者消息获取及异常处理的实现(SpringBoot2.0环境下) - 简书 (jianshu.com)

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐