上次通信的时候用的是自带的编解码器,今天自己实现一下自定义的。
1、自定义一下协议

//协议类
@Data
public class Protocol<T> implements Serializable {

    private Long id = System.currentTimeMillis();

    private short msgType;// 假设1为请求 2为响应

    private T body;
    
}

//消息请求体
@Data
public class RequestMsg implements Serializable {

    private String msg;

    private String other;

}

//消息响应体
@Data
public class ResponseMsg implements Serializable {

    private String result;

    private String error;

}

2、定义编解码器import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

//编码器
public class EnCodeMsg extends MessageToByteEncoder<Protocol<Object>> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Protocol<Object> msg, ByteBuf byteBuf) throws Exception {
        Serialization serialization = new JdkSerialization();
        byte[] body = serialization.serialize(msg.getBody());
        int length = body.length;
        Long id = msg.getId();
        short msgType = msg.getMsgType();
        byteBuf.writeLong(id);
        byteBuf.writeShort(msgType);
        byteBuf.writeInt(length);
        byteBuf.writeBytes(body);
    }
}


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

//解码器
public class DeCodeMsg extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {
        Serialization serialization = new JdkSerialization();
        long id = in.readLong();
        short msgType = in.readShort();
        int bodyLength = in.readInt();
        int i = in.readableBytes();
        if(bodyLength!=i){
            in.resetReaderIndex();
            return;
        }
        byte[] bytes = new byte[bodyLength];
        in.readBytes(bytes);
        if(msgType==(short)1){
            Protocol<RequestMsg> requestMsgProtocol = new Protocol<>();
            RequestMsg requestMsg = serialization.deserialize(bytes, RequestMsg.class);
            requestMsgProtocol.setBody(requestMsg);
            requestMsgProtocol.setId(id);
            requestMsgProtocol.setMsgType(msgType);
            list.add(requestMsgProtocol);
        }else if(msgType==(short)2){
            Protocol<ResponseMsg> responseMsgProtocol = new Protocol<>();
            ResponseMsg responseMsg = serialization.deserialize(bytes,ResponseMsg.class);
            responseMsgProtocol.setId(id);
            responseMsgProtocol.setMsgType(msgType);
            responseMsgProtocol.setBody(responseMsg);
            list.add(responseMsgProtocol);
        }else {
            return;
        }

    }
}

3、修改消息处理器


public class NettyClientHandler extends SimpleChannelInboundHandler<Protocol<ResponseMsg>> {

    private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

    private volatile Channel channel;

    private SocketAddress remotePeer;

    public Channel getChannel() {
        return channel;
    }

    public SocketAddress getRemotePeer() {
        return remotePeer;
    }

    /**
     * 注册
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        logger.info("channelRegistered--------------");
        super.channelRegistered(ctx);
        this.channel = ctx.channel();
    }

    /**
     * 激活
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.remotePeer = this.channel.remoteAddress();
        logger.info("channelActive--------------");
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,Protocol<ResponseMsg> o) throws Exception {
        logger.info("channelRead0--------------"+Thread.currentThread().getName());
        logger.info("消费者接收到的消息为{}", JSONObject.toJSONString(o));
    }

    public void sendMsg(Protocol<RequestMsg> message){
        channel.writeAndFlush(message);
    }

    public void close(){
        channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

}
public class NettyServerHandler extends SimpleChannelInboundHandler<Protocol<RequestMsg>> {

    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Protocol<RequestMsg> o) throws Exception {

        logger.info("服务端收到的消息为================{}", JSONObject.toJSONString(o));
        Protocol<ResponseMsg> protocol = new Protocol<>();
        ResponseMsg responseMsg = new ResponseMsg();
        responseMsg.setResult("SUCCESS");
        responseMsg.setError("NO ERROR");
        protocol.setBody(responseMsg);
        protocol.setMsgType((short) 2);
        protocol.setId(o.getId());
        channelHandlerContext.channel().writeAndFlush(protocol);
    }
}

4、测试

public class NettyTest {

    public static void main(String[] args) {

        new Thread(()->{
            NettyServer.startNettyServer();
        }).start();

        new Thread(()->{
            NettyClient instance = NettyClient.getInstance();
            try {
                while (true){
                    Thread.sleep(2000);
                    Protocol<RequestMsg> protocol = new Protocol<>();
                    protocol.setMsgType((short)1);
                    RequestMsg requestMsg = new RequestMsg();
                    requestMsg.setMsg("hello:"+System.currentTimeMillis());
                    requestMsg.setOther("你好啊");
                    protocol.setBody(requestMsg);
                    instance.sendMsg(protocol);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

        }).start();
    }
}

5、效果截图

在这里插入图片描述

Logo

加入「COC·上海城市开发者社区」,成就更好的自己!

更多推荐