java基于netty的udp数据接收服务
java基于netty的udp数据接收服务服务搭建step1:新建一个随微服务启动的服务类。step2:在ChineseProverbServerHandler 类中执行具体的处理方法小结:具体的数据处理方法未给出,可根据自己需求进行扩展。使用netty udp进行数据接收解析,记录下相关过程。项目使用springboot+springcloud 搭建服务搭建step1:新建一个随微服务启动...
·
java基于netty的udp数据接收服务
使用netty udp进行数据接收解析,记录下相关过程。项目使用springboot+springcloud 搭建
服务搭建
step1:新建一个随微服务启动的服务类。
// An highlighted block
package com.ggnykj.smartems.cloud.server.udpserver.server;
import com.ggnykj.smartems.cloud.server.udpserver.handler.ChineseProverbServerHandler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.concurrent.*;
/**
* @author xk
* @date 2019/7/24 15:18
* udp数据接收处理服务,用于将udp数据发送到handler处理,解析数据,并且存储到rabbitmq
*/
@Service
@Component
public class UdpEventServer implements ApplicationRunner {
private Logger log = LoggerFactory.getLogger(UdpEventServer.class);
//此处为读取springboot yml 配置文件获取 udp服务端口号
@Value("${udpServer.ports}")
private int port = 9701;
@Override
public void run(ApplicationArguments args) {
//开启线程,执行接收处理方法
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("wbAdjust-%d").build();
ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), factory, new ThreadPoolExecutor.AbortPolicy());
singleThreadPool.execute(this::doWork);
}
/**
* @return void
* @Author fxk
* @Param []
* @Description : 数据接收线程 方法实现udp上传数据的接收,并将实现数据处理handler
* @Date 2019/8/2 17:00
**/
private void doWork() {
try {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
//由于我们用的是UDP协议,所以要用NioDatagramChannel来创建
b.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)//支持广播
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//设置处理handler.执行具体处理方法
pipeline.addLast(new ChineseProverbServerHandler());
}
});
b.bind(port).sync().channel().closeFuture().await();
} catch (InterruptedException e) {
e.printStackTrace();
log.error("执行udp接收服务出错" + e.getMessage());
}
}
}
step2:在ChineseProverbServerHandler 类中执行具体的处理方法
package com.ggnykj.smartems.cloud.server.udpserver.handler;
import com.ggnykj.smartems.cloud.model.message.UdpBaseMessage;
import com.ggnykj.smartems.cloud.server.udpserver.server.CmdUdpQueueClientServer;
import common.ProtocolUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author xk
* @date 2019/7/24 15:23
* udp数据处理handler
*/
public class ChineseProverbServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private Logger log = LoggerFactory.getLogger(ChineseProverbServerHandler.class);
/**
* 在这个方法中,形参packet客户端发过来的DatagramPacket对象
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
try {
//获取消息的ByteBuf
ByteBuf buf = packet.copy().content();
//获取接收到的字节流转为string
String hexValue = ByteBufUtil.hexDump(buf);
//每两个字符为一组,空格间隔
String spaceValue = ProtocolUtils.getSpaceSplitHexString(hexValue);
log.info("客户端消息" + "==>" + spaceValue);
//todo 写具体的解析处理方法
} catch (Exception e) {
log.error("类 ChineseProverbServerHandler 出错"+e.getMessage());
}
}
}
小结:具体的数据处理方法未给出,可根据自己需求进行扩展。
代码链接:https://download.csdn.net/download/weixin_39811685/11547292
更多推荐
已为社区贡献1条内容
所有评论(0)