1.概要说明

RocketMQ 主要有四大组成部分: NameServer、 Broker、 Producer、 Consumer
Nameserver 作用:
NameServer 可以说是 Broker 的注册中心, Broker 在启动的时候,会根据配置信息向所有的 NameServer 进行注册, NameServer 会和每次前来注册的 Broker 保持长连接,并每30s 检查 Broker 是否还存活,对于宕机的 Broker, NameServer 会将其从列表中剔除。当生产者需要向 Broker 发送消息的时候,就会先从 NameServer 里面获取 Broker 的地址列表,然后负载均衡,选择一台消息服务器进行发送。

2.java连接broker端口
broker的默认端口有3个,10911, 10912, 10909。10911是remotingServer使用的监听端口,remotingServer主要处理以下三类消息producer发送的消息
conumser在消费失败或者消费超时发送的消息
consumer拉取消息
10912是主broker用于监听从broker请求的监听端口。

1)自建镜像部署rocketmq-4.8.0

Dockerfile 镜像文件内容:

FROM centos:centos8.3.2011
 
RUN rm -f /etc/localtime \
&& ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
&& echo "Asia/Shanghai" > /etc/timezone
 
ENV LANG en_US.UTF-8
 
ADD jdk-8u301-linux-x64.tar.gz /usr/java/
ADD rocketmq-all-4.8.0-bin-release.tar.gz /usr/local/
RUN mv /usr/local/rocketmq-all-4.8.0-bin-release /usr/local/rocketmq-4.8.0 \
&& mkdir -p /data/rocketmq/store

#jdk enviroment
ENV JAVA_HOME=/usr/java/jdk1.8.0_301
ENV JRE_HOME=/usr/java/jdk1.8.0_301/jre
ENV CLASSPATH=$JAVA_HOME/lib:$JAVA_HOME/jre/lib
ENV PATH=/usr/local/rocketmq-4.8.0/bin:$JAVA_HOME/bin:$PATH
 
CMD ["/bin/bash"]

 使用镜像部署

apiVersion: v1
kind: Service
metadata:
  labels:
    app: rocketmq
  name: rocketmq  
spec:
  ports:
  - port: 9876
    protocol: TCP
    targetPort: 9876
  selector:
    app: mqnamesrv
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: mqnamesrv
spec:
  serviceName: mqnamesrv
  replicas: 1
  selector:
    matchLabels:
      app: mqnamesrv
  template:
    metadata:
      labels:
        app: mqnamesrv
    spec:
      containers:
      - name: mqnamesrv
        image: liuyi71sinacom/rocketmq-4.8.0
        command: ["sh","/usr/local/rocketmq-4.8.0/bin/mqnamesrv"]
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9876
          protocol: TCP

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: rocketmqbroker
  labels:
    app: rocketmqbroker
data:
  BROKER_MEM: ' -Xms2g -Xmx2g -Xmn1g '
  broker.conf: |-
    brokerClusterName = DefaultCluster
    brokerName = broker-0
    brokerId = 0
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH 
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: mqbroker
  name: mqbroker
spec:
  type: NodePort
  ports:
    - name: mqbroker
      port: 10911
      nodePort: 30911
      protocol: TCP
      targetPort: 10911
  selector:
    app: mqbroker
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: mqbroker
spec:
  serviceName: mqbroker
  replicas: 1
  selector:
    matchLabels:
      app: mqbroker
  template:
    metadata:
      labels:
        app: mqbroker
    spec:
      containers:
      - name: mqbroker
        image: liuyi71sinacom/rocketmq-4.8.0
        command: ["sh","/usr/local/rocketmq-4.8.0/bin/mqbroker", "-n","rocketmq:9876"]
        imagePullPolicy: IfNotPresent
        env:
        - name: JAVA_OPT
          value: "-server -XX:ParallelGCThreads=1 -Xms1g -Xmx1g -Xmn512m"
          #value: "-XX:MaxRAMPercentage=80.0"
        ports:
        - containerPort: 10909
        - containerPort: 10911
        resources:
          limits:
            cpu: "2"
            memory: 2Gi
          requests:
            cpu: "1"
            memory: 2Gi
      nodeSelector:
        alibabacloud.com/is-edge-worker: 'false'
        beta.kubernetes.io/arch: amd64
        beta.kubernetes.io/os: linux
      tolerations:
        - effect: NoSchedule
          key: node-role.alibabacloud.com/addon
          operator: Exists

---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: console
  name: console
spec:
  type: NodePort
  ports:
    - name: console
      port: 8080
      nodePort: 30386
      protocol: TCP
      targetPort: 8080
  selector:
    app: console
  loadBalancer: {}
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: console
  name: console
spec:
  replicas: 1
  selector:
    matchLabels:
      app: console
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: console
    spec:
      containers:
      - image: styletang/rocketmq-console-ng
        name: rocketmq-console-ng
        env: 
        - name: JAVA_OPTS
          value: "-Drocketmq.namesrv.addr=rocketmq:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
        resources: {}
      nodeSelector:
        alibabacloud.com/is-edge-worker: 'false'
        beta.kubernetes.io/arch: amd64
        beta.kubernetes.io/os: linux
      tolerations:
        - effect: NoSchedule
          key: node-role.alibabacloud.com/addon
          operator: Exists

 使用了挂载配置文件一直报错起不来,后来手动编辑挂载数据盘和配置文件以及日志

        volumeMounts:
          - name: rocketmqbroker-config
            mountPath: /usr/local/rocketmq-4.8.0/conf/broker.conf
		    subPath: broker.conf
          - name: rocketmqbroker-data
            mountPath: /root/store
          - name: rocketmqbroker-logs
            mountPath: /root/logs
          - name: timezone
            mountPath: /etc/localtime	  
      volumes:
        - name: rocketmqbroker-config
          configMap:
            name: rocketmqbroker
            defaultMode: 0755
        - name: rocketmqbroker-data
          hostPath:
            path: /data/rocketmq/store
            type: DirectoryOrCreate		
        - name: rocketmqbroker-logs
          hostPath:
            path: /data/rocketmq/logs
            type: DirectoryOrCreate
        - name: timezone
          hostPath:
            path: /usr/share/zoneinfo/Asia/Shanghai	

namesrvAddr: http://mqbroker.yx-test.svc.cluster.local:30911   #java连接地址

2)自建镜像部署rocketmq/4.9.4并挂载数据盘

有待研究配置

3)rocketmq:4.9.4官方镜像拉取部署

apiVersion: v1
kind: Service
metadata:
  labels:
    app: rocketmqnamesrv
  name: rocketmqnamesrv
spec:
  type: ClusterIP
  ports:
  - port: 9876
    targetPort: 30011
    name: namesrvport
  selector:
    app: rocketmqnamesrv
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rocketmqnamesrv
spec:
  serviceName: rocketmqnamesrv
  replicas: 1
  selector:
    matchLabels:
      app: rocketmqnamesrv
  template:
    metadata:
     labels:
       app: rocketmqnamesrv
    spec:
      containers:
      - name: rocketmqnamesrv
        image: apache/rocketmq:4.9.4
        imagePullPolicy: IfNotPresent
        env:
          - name: TZ
            value: Asia/Shanghai
          - name: JAVA_OPT_EXT
            value: "-Duser.home=/home/rocketmq -Xms512M -Xmx512M -Xmn128m"
        command: ["sh","/home/rocketmq/rocketmq-4.9.4/bin/mqnamesrv"]
        readinessProbe:
          tcpSocket:
            port: 9876
          initialDelaySeconds: 15
          timeoutSeconds: 5
          periodSeconds: 20
      nodeName: gem-yxyw-t-c02 
 
 
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: rocketmqbroker
  name: rocketmqbroker
spec:
  # type: NodePort
  type: ClusterIP
  ports:
  - port: 10911
    targetPort: 30012
    name: broker-port
  selector:
    app: rocketmqbroker
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rocketmqbroker
spec:
  serviceName: rocketmqbroker
  replicas: 1
  selector:
    matchLabels:
      app: rocketmqbroker
  template:
    metadata:
      labels:
        app: rocketmqbroker
    spec:
      containers:
      - name: rocketmqbroker
        image: apache/rocketmq:4.9.4
        env:
          - name: TZ
            value: Asia/Shanghai
          - name: JAVA_OPT_EXT
            value: "-Xms512M -Xmx512M -Xmn128m"
          - name: NAMESRV_ADDR
            value: rocketmqnamesrv:9876
        command: ["sh","/home/rocketmq/rocketmq-4.9.4/bin/mqbroker","-n","rocketmqnamesrv:9876"]
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 10911
      imagePullSecrets:
      - name: registry-pull-secret
      nodeName: gem-yxyw-t-c02
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: console
  name: console
spec:
  type: NodePort
  ports:
  - port: 30015
    targetPort: 30013
    name: port
    nodePort: 30014
  selector:
    app: console
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: console
  name: console
spec:
  type: NodePort
  ports:
    - name: console
      port: 8080
      nodePort: 30386
      protocol: TCP
      targetPort: 8080
  selector:
    app: console
  loadBalancer: {}
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: console
  name: console
spec:
  replicas: 1
  selector:
    matchLabels:
      app: console
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: console
    spec:
      containers:
      - image: styletang/rocketmq-console-ng
        name: rocketmq-console-ng
        env: 
        - name: JAVA_OPTS
          value: "-Drocketmq.namesrv.addr=rocketmq:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
        resources: {}

 3、简单测试使用
在发送/接收消息之前,我们需要告诉客户名称服务器的位置。 RocketMQ 提供了多种方法来实现这一点。为了简单起见,我们使用环境变量 NAMESRV_ADDR。通过使用 bin/tools.sh工具类,实现测试发送消息。命令行操作如下:
进入 mqbroker  kubectl exec -it mqbroker-0 -- sh
设置 Namesrv 服务器的地址

export NAMESRV_ADDR=rocketmq:9876
echo $NAMESRV_ADDR
cd /usr/local/rocketmq-4.8.0/bin/
执行生产者 Producer 发送测试消息

sh ./tools.sh org.apache.rocketmq.example.quickstart.Producer

如果发送成功,我们会看到大量成功的发送日志。

控制台界面

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐