背景:
在性能功能自测的时候,需要模拟一个mq消息到MQ管理界面中。想着python更简单点,于是使用python给Mq管理界面中发送消息。
在这里插入图片描述

1.python发送消息到MQ管理界面

直接上代码,废话不多说。

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import pika
import json

credentials = pika.PlainCredentials('admin', 'Mq@132,')  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.1.231', port=5672, virtual_host='/', credentials=credentials))
channel=connection.channel()
# 声明消息队列,消息将在这个队列传递,如不存在,则创建
#result = channel.queue_declare(queue='msgqueue_otn_collect_request_queue', durable=True, auto_delete=True)

message = json.dumps({
    "jobname": "yanglin",
    "collect_time": "12345678",
    "ne": [
        {
            "neid": "100009",
            "ip": "172.16.65.111",
            "port": "161",
            "device_type": "snmp",
            "protoParam": {
                "v3securityLevel": "noAuthNoPriv",
                "v3authKey": "",
                "version": 2,
                "timeout": 5,
                "communityRead": "public",
                "v3privProtocol": "DES",
                "retries": 2,
                "v3privKey": "",
                "v3securityName": "",
                "v3authProtocol": "MD5"
            },
            "rsurl": [
                {
                    "rsurl": "/ne=100009/shelf=1/slot=1/card=1.1/port=1#portType=262",
                    "urlhash": "12345678",
                    "index": 1,
                    "metricGroup": [
                        {
                            "metricGroupId": 11104,
                            "oid": [
                                {
                                    "name": "opticalPowerOut",
                                    "oid": "0500111202"
                                },
                                {
                                    "name": "opticalPowerIn",
                                    "oid": "0500111203"
                                }
                            ]
                        },
                        {
                            "metricGroupId": 30207,
                            "oid": [
                                {
                                    "name": "neTemperature",
                                    "oid": "1111111111"
                                }
                            ]
                        }
                    ]
                }
            ]
        }
    ]
})

# 向队列插入数值 routing_key是队列名
msg_body = message + ""
channel.basic_publish(exchange='tiap_ems_basic_mgt_direct_exchange', routing_key='msgqueue_otn_collect_request_queue', body=msg_body)
print(msg_body)
print(type(msg_body))
connection.close()

发给java后台是一个字符类型的json串。

2. Java后台接收消息

import com.raisecom.tiap.ems.basic.mgt.constant.RabbitConstant;
import com.raisecom.tiap.ems.basic.mgt.service.performance.PerfCollection8600CommonService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
 * 类功能描述:<br>
 * <ul>
 * <li>类功能描述1<br>
 * <li>类功能描述2<br>
 * <li>类功能描述3<br>
 * </ul>
 * 修改记录:<br>
 * <ul>
 * <li>修改记录描述1<br>
 * <li>修改记录描述2<br>
 * <li>修改记录描述3<br>
 * </ul>
 * 
 * @author yanglin
 * @version 1.0
 */
@Component
@Slf4j
public class PerformanceCollectListener {
	@Autowired
	PerfCollection8600CommonService perfCollection8600CommonService;
	//public static final String MSGQUEUE_OTN_COLLECT_REQUEST_QUEUE = "msgqueue_otn_collect_request_queue";

	@RabbitListener(queues = { RabbitConstant.MSGQUEUE_OTN_COLLECT_REQUEST_QUEUE})
	@RabbitHandler
	public void process(@Payload Message msgBody) {
		try {
			log.info("PerformanceCollectListener get msgBody++++++++++++++++:" + msgBody);
			String messageBody = new String(msgBody.getBody());
			perfCollection8600CommonService.process(messageBody);
		} catch (Throwable e) {
			log.error("process error {}", e);
		}
	}
}

在代码中process方法后边的参数使用的Message 类型可以解决字节码问题。
下边是尝试过程中遇到的问题。

2.1 参数使用String 类型
public void process(@Payload String msgBody) {
	perfCollection8600CommonService.process(msgBody);
}

会发现msgBody是字节码。自己试试 就知道了。

  • 首先字节码中文无法正常显示。
  • 非中文情况下需要对String msgBody 转为str 类型,烦人。
2.1.1 字节转str
@RabbitListener(queues = { RabbitConstant.MSGQUEUE_OTN_COLLECT_REQUEST_QUEUE})
public void process(String message) {
    System.out.println(arrayToStr(ascToArray(message)));
}

private String arrayToStr(int[] arr) {
    String res = "";
    for (int i = 0; i < arr.length; i++) {
        res += Character.toString((char)arr[i]);
    }
    return res;
}

private int[] ascToArray(String str) {
    String[] arr = str.split(",");
    int[] resArr = new int[arr.length];
    for (int i = 0; i < arr.length; i++) {
        resArr[i] = Integer.parseInt(arr[i]);
    }
    return resArr;
}

2.2 参数使用Object 类型
@RabbitListener(queues = { RabbitConstant.MSGQUEUE_OTN_COLLECT_REQUEST_QUEUE})
public void process(Object message) {
    System.out.println(message);
}

发现message对象,Python发送端发来依旧是字节码形式(查看message对象),java发送端则正常。

关于java发送json消息到mq管理界面这里不做介绍了,有机会出个文章。

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐