项目源码 下载
第一个项目:qucik4j 下载地址:https://github.com/ZhangHLong/quick4j
作用: 定时监控指定服务端口是否异常,发生异常,会发送MQ消息 通知email-server服务。

第二个项目:email-server 下载地址:https://github.com/ZhangHLong/email-server
作用:监听消息,发送邮件通知。

参考资料:
1.http://blog.csdn.net/heyutao007/article/details/50131089
2.http://mp.weixin.qq.com/s?__biz=MzIxMjAzMDA1MQ==&mid=2648945635&idx=1&sn=966633eeba2567e7759b597e43568054&chksm=8f5b54efb82cddf9678821ad9708fc404c087034471f3385ccac09dae0392a146b3673e3ccbd#rd
重要环节步骤

1.rabbitMQ 的下载安装

注意,启动项目之前一定要先开启该服务。

这里写图片描述

确认开启后,访问:http://127.0.0.1:15672/ 会进入

这里写图片描述

2. qucik4j 项目准备

qucik4j 是一个快速搭建基础项目的 种子项目,新手可以快速搭建项目,这里不在多说,有兴趣的可以搜索一下。

项目结构:

这里写图片描述

2.1 rabbitmq 连接服务配置

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <description>rabbitmq 连接服务配置</description>

    <!-- 连接配置 -->
    <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" username="guest" password="guest" port="5672"
                               virtual-host="/"/>

    <rabbit:admin connection-factory="connectionFactory"/>
    <!--消息对象json转换类  -->
    <bean id="jsonMessageConverter" class="com.eliteams.quick4j.web.utils.FastJsonMessageConverter"/>
    <!-- spring template声明-->
    <rabbit:template exchange="test-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory"
                     reply-timeout="30000" message-converter="jsonMessageConverter"/>

    <!-- 声明一个Que -->
    <rabbit:queue id="test_queue" name="test_queue" durable="true" auto-delete="false" exclusive="false"/>
    <!--声明2个发布订阅的队列-->
    <rabbit:queue id="test_fanout_queue" name="test_fanout_queue" durable="true" auto-delete="false" exclusive="false"/>
    <rabbit:queue id="test_fanout_queue_other" name="test_fanout_queue_other" durable="true" auto-delete="false" exclusive="false"/>
    <!--
        durable:是否持久化
        exclusive: 仅创建者可以使用的私有队列,断开后自动删除
        auto_delete: 当所有消费客户端连接断开后,是否自动删除队列
     -->
    <!-- 声明一个Exchange direct 点对点方式 -->
    <rabbit:direct-exchange name="test-mq-exchange" durable="true" auto-delete="false" id="test-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue" key="test_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <!-- 声明一个Exchange 广播发布订阅 方式 ,与该交换机绑定的队列均会收到消息,绑定队列 test_fanout_queue 和 test_fanout_queue_other -->
    <rabbit:fanout-exchange name="test-fanoutmq-exchange" durable="true" auto-delete="false" id="test-fanoutmq-exchange">
       <rabbit:bindings>
           <rabbit:binding queue="test_fanout_queue" ></rabbit:binding>
           <rabbit:binding queue="test_fanout_queue_other"></rabbit:binding>
       </rabbit:bindings>
    </rabbit:fanout-exchange>

</beans>

2.2 核心监控代码

package com.eliteams.quick4j.web.listenner;

import com.eliteams.quick4j.web.notifyauthor.EmailUtil;
import com.eliteams.quick4j.web.notifyauthor.MailEntity;
import com.eliteams.quick4j.web.service.RabbitMQSendService;
import com.eliteams.quick4j.web.service.impl.RabbitMQSendServiceImpl;
import com.eliteams.quick4j.web.utils.BaseinfoMessage;
import com.eliteams.quick4j.web.utils.JsonUtils;
import com.eliteams.quick4j.web.utils.SpringContext;
import org.apache.commons.net.telnet.TelnetClient;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.integration.Message;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.context.ContextLoader;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;

import javax.annotation.Resource;
import javax.servlet.ServletContext;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimerTask;
public class ListennerTask extends TimerTask{
//    @Resource
//    private RabbitMQSendService rabbitMQSendService;
    //引入spring bean
    RabbitMQSendService rabbitMQSendService = (RabbitMQSendService) SpringContext.getContext().getBean("rabbitMQSendService");

    private static  final int SERVER_PORT = 8092;
    private static  final String SERVER_IP = "222.185.124.100";
    private int count = -1;
    TelnetClient telnet ;
    @Override
    public void run() {
        count++;
        RequestHospServerListenner requestHospServerListenner = null;
        if (count > 0 ){
            telnet = new TelnetClient();
            try {
                telnet.connect(SERVER_IP,SERVER_PORT);
                System.out.println("时间:"+ timeConvent(System.currentTimeMillis()) +"请求开始执行,第"+ (count)+"次,服务端口正常开启");
            } catch (IOException e) {
                Date time = new Date();
                String msgtext = time + "\n" + SERVER_IP + ":" + SERVER_PORT + " is not reachable!";
                System.out.println("时间:"+ timeConvent(System.currentTimeMillis()) +"请求开始执行,第"+ (count)+"次,服务端口开启异常,"+msgtext);

                //发送mq消息
                //包装消息
                BaseinfoMessage baseinfoMessage = new BaseinfoMessage();
                baseinfoMessage.setEmailAddress("18358110561@163.com");

                // 指定 routingkey
                rabbitMQSendService.sendData("test_queue",baseinfoMessage);
                //fanout 广播发布订阅 指定发布消息到 该交换机上,与该交换机绑定的队列均会收到消息
                rabbitMQSendService.sendData("test-fanoutmq-exchange",null,baseinfoMessage);
            }


            requestHospServerListenner = new RequestHospServerListenner();
        }
    }

    public String timeConvent(long str){
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        long ltime = new Long(str);
        Date date = new Date(ltime);
        String s = simpleDateFormat.format(date);

        return s;
    }
}

2.3 发送完消息可以到 web端查看
2.3.1 queues

这里写图片描述

2.3.2 队列内部详细信息
这里写图片描述
这里写图片描述
2.3.3 查看发送的消息内容
这里写图片描述

3.消息接收服务应用 email-server

项目结构目录:

这里写图片描述

3.1 消息接收端的配置 springrabbit.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
    <!--配置connection-factory,指定连接rabbit server参数 -->
    <rabbit:connection-factory id="connectionFactory" username="guest" password="guest" host="127.0.0.1" port="5672" />
    <!--定义rabbit template用于数据的接收和发送 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="exchangeTest"/>

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin id="connectAdmin" connection-factory="connectionFactory"/>
    <!-- 消息对象json转换类 -->
    <bean id="jsonMessageConverter" class="com.huilong.emailserver.util.FastJsonMessageConverter" />

    <!--定义queue -->
    <rabbit:queue name="test_queue" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin"/>

    <!-- 定义direct exchange,绑定test_queue 其实这里没有必要做绑定,消费者 只关心监听的队列即可,消息由上游服务决定发送与否 ,不过这里也添加了绑定,为的是清晰点-->
    <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false" declared-by="connectAdmin">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue" key="test_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <!-- 消息接收者 -->
    <bean id="messageReceiver" class="com.huilong.emailserver.consumer.MessageDirectConsumer" ></bean>

    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象;注意这里监听的队列 应该保持和 消息发送者端定义的一样-->
    <rabbit:listener-container connection-factory="connectionFactory" message-converter="jsonMessageConverter" >
        <rabbit:listener queues="test_queue" ref="messageReceiver"/>
    </rabbit:listener-container>



    <!--定义 发布订阅 queue -->
    <rabbit:queue name="test_fanout_queue" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin"/>

    <!-- 消息接收者 -->
    <bean id="receiverFanout" class="com.huilong.emailserver.consumer.MessageFanoutConsumer"></bean>

    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="test_fanout_queue" ref="receiverFanout"/>
    </rabbit:listener-container>



    <!-- ###############################################################分割线关于模糊匹配 暂时没有写代码 后续 再搞,最近比较忙 ########################################################## -->


    <!--配置connection-factory,指定连接rabbit server参数 -->
    <rabbit:connection-factory id="connectionFactory2"
                               username="guest" password="guest" host="127.0.0.1" port="5672"/>

    <!--定义rabbit template用于数据的接收和发送 -->
    <rabbit:template id="amqpTemplate2" connection-factory="connectionFactory2"
                     exchange="exchangeTest2"/>

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin id="connectAdmin2" connection-factory="connectionFactory2"/>

    <!--定义queue -->
    <rabbit:queue name="direct_test_queue" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin2"/>

    <!-- 定义direct exchange,绑定queueTest -->
    <rabbit:topic-exchange name="exchangeTest2" durable="true" auto-delete="false" declared-by="connectAdmin2">
        <rabbit:bindings>
            <rabbit:binding queue="direct_test_queue" pattern="direct.*"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!-- 消息接收者 -->
    <bean id="recieverDirectQueue" class="com.huilong.emailserver.consumer.MessageTopicConsumer"></bean>

    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionFactory2" >
        <rabbit:listener queues="direct_test_queue" ref="recieverDirectQueue"/>
    </rabbit:listener-container>
</beans>

3.2核心监听处理代码

package com.huilong.emailserver.consumer;

import com.huilong.emailserver.server.email.EmailListennerService;
import com.huilong.emailserver.util.BaseinfoMessage;
import com.huilong.emailserver.util.FastJsonMessageConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * Desc:监听 发布订阅方式的q
 * Author: zhanghl2
 * Date: 2016/11/28
 * Time: 15:48
 * Version:
 */
public class MessageFanoutConsumer implements MessageListener{
    private Logger logger = LoggerFactory.getLogger(MessageFanoutConsumer.class);
    // 邮件服务
    @Autowired
    private  EmailListennerService emailListennerService;
    @Override
    public void onMessage(Message message) {
        logger.info("consumer receive message------->:{}", message);
        BaseinfoMessage messageObject = new BaseinfoMessage();
        // 转换为java对象
        FastJsonMessageConverter jsonMessageConverter = new FastJsonMessageConverter();
        messageObject= jsonMessageConverter.fromMessage(message,messageObject);
        // 发送 邮件 ,发送方式 fanout 
        emailListennerService.send(messageObject.getEmailAddress(),"fanout");
    }
}

1.问题点:
消息的接收 处理过程,之前在网上看到很多使用的是

 <!--消息对象json转换类  org.springframework.amqp.support.converter.Jackson2JsonMessageConverter-->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

**具体实现代码:**
/**
 * direct
 * 监听mq消息
 */
public class MessageDirectConsumer implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(MessageDirectConsumer.class);
    @Autowired
    private EmailListennerService emailListennerService;
    @Autowired
    private Jackson2JsonMessageConverter jsonMessageConverter;
    @Override
    public void onMessage(Message message) {
        logger.info("consumer receive message------->:{}", message);

        BaseinfoMessage messageObject = new BaseinfoMessage();
        // 这一步发生了异常
        messageObject = (BaseinfoMessage) jsonMessageConverter.fromMessage(message);

        emailListennerService.send(messageObject.getEmailAddress());
    }
}

这种配置 适合在同一个项目中接收消息。不同项目引用的对象不能共用(有其他方法可以解决,这里不多说了),导致了我这个异常。我在使用这个方法的时候遇到的问题是 消息发送端发送的消息解析的时候,会发生Class not Found 异常。源码我看了下:
这里写图片描述

如有 错误或不恰当之处,还请广大 T 才指出,帮助你我共同成长!!

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐