.NET 基于DotNetty开发物联网关通信协议
数据缓冲区
一.背景
实际开发过程中遇到一个物联网项目,需要开发一个物联网关来和设备端进行实时双向通信,而实时的数据传输需要用到长链接的socket通信方案,因此不仅需要制定网络握手机制和数据传输协议,还要解决网络传输过程中数据帧的粘包,半包,数据字节丢失等问题,最终项目应用了微软发布的DotNetty网络通信基础框架,使用该框架提供的Api可以很大程度提高物联网关的开发效率,在作出实际使用场景的适应性调整后,很好的满足了实际业务的通信需求。
二. 参考资料
DotNetty源码:[Azure/DotNetty: DotNetty project – a port of netty, event-driven asynchronous network application framework (github.com)]
DotNettySocket源码:Coldairarrow/DotNettySocket: An Easy Socket(TcpSocket,WebSocket,UdpSocket) Framework Based On DotNetty (github.com)
Netty源码解析-解码器(Decoder)是如何工作(图文并茂) - 掘金 (juejin.cn)
[(30条消息) 利用netty开发工业级通信协议:帧头帧尾识取数据帧,粘包、半包、无效包处理_netty 帧头_nutwangyg的博客-CSDN博客]:
三.DotNetty解码器原理简述
网络发送的字节先通过累加器放置在缓存区中,再通过解码器进行数据帧的解析读取,当缓存区的数据不够解析成一个完整的帧时,则等待后续的网络字节合并到缓存区中直至组成一个完整的帧;当缓存区的数据存在多个帧时,则解码器依次读取每一个帧进行输出,并通过设置缓存区已读字节的索引位置来避免重复读取,从而解决了粘包和半包的问题。
四.实际开发遇到问题及解决办法
-
基于长度的编码和解码器存在的字节错位问题
DotNettySocket项目默认使用基于长度的编码和解码器,实现的逻辑是编码器给数据帧的头部加上两个字节的长度信息,解码器读取头部的两个字节的长度信息从缓冲区中读取对应长度的字节作为数据帧,这种方式可以解决粘包和半包的问题,但是当网络信号不好导致数据帧丢失部分字节的时候,如果依旧每个帧按长度进行读取则会存在字节错位的问题,从而导致后续所有数据帧都读取错误。
-
基于固定帧头和固定帧尾加长度的编码和解码器解决字节错位问题。
编码器的逻辑较为简单,分别在数据帧头部和尾部加上帧头和帧尾,组成一个完整的帧进行发送。
解码器的逻辑较为复杂,在缓存区中同时存在到帧头和帧尾时,将帧头帧尾中间的所有字节作为数据帧,读取数据帧首部的两个字节作为帧的长度和数据帧整体的长度进行比较,如果长度一致则将有效帧进行输出,否则就将无效帧进行丢弃,这样就可以保证输出的每个帧都是完整的有效帧。
五.项目代码示例
-
项目集成DotNettySocket
下载DotNettySocket的项目源码,新建物联网关项目,添加DotNettySocket类库项目的引用。
不使用nuget安装包是因为需要对改项目源码进行调整。
-
DotNettySocket项目添加基于帧头帧尾+长度的自定义的编码解码器。
1.DotNettySocket项目添加帧头帧尾的编码和解码类
/// <summary> /// 基于帧头帧尾的编码类 /// </summary> public class HeaderEnderEncoder : MessageToMessageEncoder<IByteBuffer> { private byte[] header; private byte[] ender; public HeaderEnderEncoder(byte[] header, byte[] ender) { this.header = header; this.ender = ender; } protected override void Encode(IChannelHandlerContext context, IByteBuffer messageBytes, List<object> output) { //给发送的数据帧加上帧头和帧尾 var length = header.Length + messageBytes.ReadableBytes + ender.Length; IByteBuffer initialMessage = Unpooled.Buffer(length); initialMessage.WriteBytes(header); initialMessage.WriteBytes(messageBytes); initialMessage.WriteBytes(ender); output.Add(initialMessage.Copy()); } } /// <summary> /// 基于帧头帧尾的解码类 /// </summary> public class HeaderEnderDecoder : ByteToMessageDecoder { private byte[] header; private byte[] ender; private long maxCapacity; public HeaderEnderDecoder(byte[] header, byte[] ender) { this.header = header; this.ender = ender; maxCapacity = ushort.MaxValue; } protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) { var childBuf = Decode(input); // 如果获得有效数据帧 if (childBuf != null) { // 将有效数据备份加入接收列表 output.Add(childBuf.Copy()); } } /// <summary> /// 解析数据帧 /// </summary> /// <param name="input">数据缓冲区</param> /// <returns></returns> private IByteBuffer Decode(IByteBuffer input) { var headerBuf = Unpooled.CopiedBuffer(header); var enderBuf = Unpooled.CopiedBuffer(ender); // 获取帧头位置 var sliceStart = ByteBufferUtil.IndexOf(headerBuf, input); if (sliceStart > -1 && sliceStart < input.Capacity) { // 舍弃帧头前面的数据 input.SetReaderIndex(sliceStart); } // 获取帧尾的起始位置 var sliceEnd = ByteBufferUtil.IndexOf(enderBuf, input); if (sliceStart > -1 && sliceEnd > -1 && sliceEnd > sliceStart && sliceEnd < input.Capacity) { //获取数据帧的长度 var length = sliceEnd - sliceStart - headerBuf.ReadableBytes; //读取帧头后面两个字节作为帧的长度用来判断帧是否完整 var framelength = input.Copy(sliceStart + headerBuf.ReadableBytes, 2).ReadShortLE(); if (framelength == length) { //获取数据子帧,从帧头位置 var frame = input.Slice(sliceStart + headerBuf.ReadableBytes, length); //设置已读的索引长度 input.SkipBytes(sliceEnd - sliceStart + enderBuf.ReadableBytes); //将数据帧返回 return frame; } else { //长度不一致则说明不完整,数据帧丢失字节数据,则丢弃帧头,进入下一轮帧头处理 input.SkipBytes(header.Length); } } //超过最大缓存容量,则清掉缓存数据 if (input.Capacity > maxCapacity) { input.SkipBytes(input.ReadableBytes); } return null; } }
- 添加和实现编码解码器的API接口
/// <summary> /// 编码和解码器的构建者,在原有的代码下面添加下面两个接口 /// </summary> /// <typeparam name="TBuilder">指定构建者</typeparam> public interface ICoderBuilder<TBuilder> { /// <summary> /// 设置基于帧头和帧尾的解码器,解决数据丢失问题 /// </summary> /// <param name="header">帧头字节</param> /// <param name="ender">帧尾字节</param> /// <returns></returns> TBuilder SetHeaderEnderDecoder(byte[] header, byte[] ender); /// <summary> /// 设置基于帧头和帧尾的编码器,解决数据丢失问题 /// </summary> /// <param name="header">帧头字节</param> /// <param name="ender">帧尾字节</param> /// <returns></returns> TBuilder SetHeaderEnderEncoder(byte[] header, byte[] ender); } /// <summary> /// TCP Socket服务端的建造者,实现下面两个接口 /// </summary> internal class TcpSocketServerBuilder : BaseGenericServerBuilder<ITcpSocketServerBuilder, ITcpSocketServer, ITcpSocketConn, byte[]>, ITcpSocketServerBuilder { //设置帧头和帧尾的编码方式 public ITcpSocketServerBuilder SetHeaderEnderDecoder(byte[] header, byte[] ender) { _setEncoder += x => x.AddLast(new HeaderEnderDecoder(header, ender)); return this; } //设置帧头和帧尾的解码方式 public ITcpSocketServerBuilder SetHeaderEnderEncoder(byte[] header, byte[] ender) { _setEncoder += x => x.AddLast(new HeaderEnderEncoder(header, ender)); return this; } }
-
物联网关使用代码示例
/// <summary> /// 服务端主逻辑 /// </summary> public class Main { /// <summary> /// 监听端口 /// </summary> public int _port; public Main(int port) { _port = port; } /// <summary> /// Tcp Socket /// </summary> public ITcpSocketServer _tcpServer; /// <summary> /// 帧头 /// </summary> public byte[] header = new byte[] { 80, 80, 80, 80 };// 十六进制是 [0x50,0x50,0x50,0x50] /// <summary> /// 帧尾 /// </summary> public byte[] ender = new byte[] { 88, 88 };// 十六进制是 [0x58,0x58] /// <summary> /// 初始化TCP监听服务 /// </summary> /// <param name="port"></param> /// <returns></returns> public async Task initAsync() { _tcpServer = await SocketBuilderFactory.GetTcpSocketServerBuilder(_port) //设置帧头帧尾编码类 .SetHeaderEnderEncoder(header, ender) //设置帧头帧尾解码类 .SetHeaderEnderDecoder(header, ender) .OnConnClose(async (server, connection) => await HandleConnCloseAsync(server, connection)) .OnException(async ex => await HandleExceptionAsync(ex)) .OnNewConn(async (server, connection) => await HandleNewConnAsync(server, connection)) .OnRecieve(async (server, connection, bytes) => await HandleRecieveAsync(server, connection, bytes)) .OnSend(async (server, connection, bytes) => await HandleSendAsync(server, connection, bytes)) .OnServerStarted(async server => await HandleServerStartedAsync(server)) .BuildAsync(); } }
更多推荐
所有评论(0)