使用netty udp进行数据接收解析,记录下相关过程。项目使用springboot+springcloud 搭建

服务搭建

step1:新建一个随微服务启动的服务类。

// An highlighted block
package com.ggnykj.smartems.cloud.server.udpserver.server;

import com.ggnykj.smartems.cloud.server.udpserver.handler.ChineseProverbServerHandler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.concurrent.*;

/**
 * @author xk
 * @date 2019/7/24 15:18
 * udp数据接收处理服务,用于将udp数据发送到handler处理,解析数据,并且存储到rabbitmq
 */
@Service
@Component
public class UdpEventServer implements ApplicationRunner {
    private Logger log = LoggerFactory.getLogger(UdpEventServer.class);
    //此处为读取springboot yml 配置文件获取 udp服务端口号
    @Value("${udpServer.ports}")
    private int port = 9701;

    @Override
    public void run(ApplicationArguments args) {
//开启线程,执行接收处理方法
        ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("wbAdjust-%d").build();
        ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), factory, new ThreadPoolExecutor.AbortPolicy());
        singleThreadPool.execute(this::doWork);
    }

    /**
     * @return void
     * @Author fxk
     * @Param []
     * @Description : 数据接收线程  方法实现udp上传数据的接收,并将实现数据处理handler
     * @Date 2019/8/2 17:00
     **/
    private void doWork() {
        try {
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            //由于我们用的是UDP协议,所以要用NioDatagramChannel来创建
            b.group(group)
                    .channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)//支持广播
                    .handler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //设置处理handler.执行具体处理方法
                            pipeline.addLast(new ChineseProverbServerHandler());
                        }
                    });

            b.bind(port).sync().channel().closeFuture().await();
        } catch (InterruptedException e) {
            e.printStackTrace();
            log.error("执行udp接收服务出错" + e.getMessage());

        }
    }
}

step2:在ChineseProverbServerHandler 类中执行具体的处理方法

package com.ggnykj.smartems.cloud.server.udpserver.handler;

import com.ggnykj.smartems.cloud.model.message.UdpBaseMessage;
import com.ggnykj.smartems.cloud.server.udpserver.server.CmdUdpQueueClientServer;
import common.ProtocolUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * @author xk
 * @date 2019/7/24 15:23
 * udp数据处理handler
 */
public class ChineseProverbServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
    private Logger log = LoggerFactory.getLogger(ChineseProverbServerHandler.class);
    /**
     * 在这个方法中,形参packet客户端发过来的DatagramPacket对象
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
        try {
          //获取消息的ByteBuf 
            ByteBuf buf = packet.copy().content();
            //获取接收到的字节流转为string
            String hexValue = ByteBufUtil.hexDump(buf);
            //每两个字符为一组,空格间隔
            String spaceValue = ProtocolUtils.getSpaceSplitHexString(hexValue);
            log.info("客户端消息" + "==>" + spaceValue);
       //todo 写具体的解析处理方法
        } catch (Exception e) {
            log.error("类 ChineseProverbServerHandler 出错"+e.getMessage());
        }
    }

}

小结:具体的数据处理方法未给出,可根据自己需求进行扩展。

代码链接:https://download.csdn.net/download/weixin_39811685/11547292

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐