Android开发之使用Netty进行Socket编程(二) 主要介绍了Netty框架内的主要使用的类以及 在客户端基本的建立连接并接收消息 的简单示例。众所周知,在Android应用开发中,一般是发起网络请求,拿到数据后异步更新UI,那么本文就介绍在开发过程中,如何封装Netty,方便开发者请求服务端的数据并异步地更新UI。

1 基本功能

与服务器建立TCP连接,TCP是面向连接的协议,只能用于点对点的通讯,所以对服务器的定位是IP地址+端口号。

发送消息给服务器,相当于发起请求,这个消息里面应该包含 与后台协议好的校验部分 以及 提供给服务器索引数据的参数部分。

理想状态下,TCP连接一旦建立,在通信双方中的任何一方主动关闭连接前,TCP 连接都将被一直保持下去,所以需要定时不间断地给服务器发送一个后台定义的消息段,即心跳包,让对方知道自己“在线”。

接收服务器返回的数据,并且拿到这些数据异步地去更新程序页面,将获取到的JSON文本解析成POJO。

2 接口的定义

利用Java的回调机制,在子线程建立连接并发出请求,在接收服务器返回的数据后告诉主线程更新UI。还包括一些连接异常的处理都通过回调接口实现。

public interface INettyClient {

void connect(String host, int port);//1. 建立连接

void sendMessage(int mt, String msg, long delayed);//2. 发送消息

void addDataReceiveListener(OnDataReceiveListener listener);//3. 为不同的请求添加监听器

interface OnDataReceiveListener {

void onDataReceive(int mt, String json);//接收到数据时触发

}

interface OnConnectStatusListener {

void onDisconnected();//连接异常时触发

}

}

3 对服务器返回数据的处理

对数据的处理主要是在ChannelHandler中完成(详见Android开发之使用Netty进行Socket编程(二) )。在这里我们主要继承了ChannelInboundHandlerAdapter,并提供了回调接口供主线程更新UI。

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

private final String TAG = "Netty";

private INettyClient.OnConnectStatusListener statusListener;

private List listeners = new ArrayList<>();

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

//channelActive()方法将会在连接被建立并且准备进行通信时被调用。

Log.d(TAG, "channel active");

super.channelActive(ctx);

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg)

throws Exception {

//channelRead()方法是在数据被接收的时候调用。

ByteBuf buf = (ByteBuf) msg;

byte[] req = new byte[buf.readableBytes()];

buf.readBytes(req);

String body = new String(req, "UTF-8");

//verify(String body)方法对服务器返回的数据进行校验,并取出数据部分。

//具体校验的方法需要与后台同事进行协议。

body = verify(body);

Log.d(TAG, "verify : " + body);

if (null != body)

parseJson(body);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

//exceptionCaught()事件处理方法是当出现Throwable对象才会被调用,

//即当Netty由于IO错误或者处理器在处理事件时抛出的异常时。

//在大部分情况下,捕获的异常应该被记录下来并且把关联的channel给关闭掉。

ctx.close();

Log.e(TAG, "Unexpected exception from downstream : "

+ cause.getMessage());

if (statusListener != null)//连接异常时触发onDisconnected()

statusListener.onDisconnected();

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

ctx.fireChannelReadComplete();

LogUtils.d(TAG, "channelReadComplete");

}

//对数据进行解析,拿出区分不同请求的 flag字段,再根据不同的flag字段去触发相对应的监听器

private void parseJson(String json) {

try {

JSONObject jObject = new JSONObject(json);

int msgType = jObject.getInt(Constant.FLAG_MT);

Log.d(TAG, "parseJson message type: " + msgType + " json: " + json);

callListeners(msgType, json);

} catch (Exception e) {

LogUtils.e(TAG, "parseJson exception: " + e.getMessage());

e.printStackTrace();

}

}

//遍历监听器List,触发拥有正确msgType 的OnDataReceiveListener,

//回调 void onDataReceive(int mt, String json);方法

private void callListeners(final int msgType , final String json) {

for (final INettyClient.OnDataReceiveListener listener : listeners)

if (listener != null)

new Handler(Looper.getMainLooper()).post(new Runnable() {

@Override

public void run() {//主线程中进行

listener.onDataReceive(mt, json);

}

});

}

//绑定OnDataReceiveListener

public void addDataReceiveListener(INettyClient.OnDataReceiveListener listener) {

if (!listeners.contains(listener))

listeners.add(listener);

}

//绑定OnConnectStatusListener

public void setConnectStatusListener(INettyClient.OnConnectStatusListener listener) {

statusListener = listener;

}

}```

以上,就是一个供Android客户端使用的``ChannelHandler``,可以通过实现具体的``OnDataReceiveListener ``来异步地获得服务器返回的 数据。

## 4 NettyClient的实现

以上仅仅是展示了如何处理服务器返回的数据。建立连接、发送消息以及心跳包的功能还没进行封装。在2.接口的定义 里面已经定义好了NettyClient应该具备哪些行为,现在进行具体的实现。

主要的实现思路是:

1. 构建Bootstrap,其中包括设置好ChannelHandler来处理将来接收到的数据(详见[Android开发之使用Netty进行Socket编程(二)](http://www.jianshu.com/p/db74e673e43c) )。由Boostrap建立连接。通过`` channel.writeAndFlush(constructMessage(sendMsg)).sync()``发送消息。这些工作都在子线程完成。

2. 在子线程 建立连接并向服务器发送请求,这里采用了`HanlderThread`+`Handler`的方案。通过`Looper`依次从`Handler`的队列中获取信息,逐个进行处理,保证安全,不会出现混乱。

3. 心跳包的发送通过``handleMessage(Message msg)``中的死循环进行不间断地发送。

4. `NettyClientHandler `的实现中我们已经知道,当Netty异常时会触发`statusListener.onDisconnected();`,NettyClient中,onDisconnected()方法会进行重连操作。

接收到服务器返回的消息时,会在主线程中触发``onDataReceiveListener .onDataReceive(mt, json);``。

5. 外部通过单例模式进行调用。

public class NettyClient implements INettyClient {

private final String TAG = NettyClient.class.getSimpleName();

private static NettyClient mInstance;

private Bootstrap bootstrap;

private Channel channel;

private String host;

private int port;

private HandlerThread workThread = null;

private Handler mWorkHandler = null;

private NettyClientHandler nettyClientHandler;

private final String ACTION_SEND_TYPE = "action_send_type";

private final String ACTION_SEND_MSG = "action_send_msg";

private final int MESSAGE_INIT = 0x1;

private final int MESSAGE_CONNECT = 0x2;

private final int MESSAGE_SEND = 0x3;

private Handler.Callback mWorkHandlerCallback = new Handler.Callback() {

@Override

public boolean handleMessage(Message msg) {

switch (msg.what) {

case MESSAGE_INIT: {

NioEventLoopGroup group = new NioEventLoopGroup();

bootstrap = new Bootstrap();

bootstrap.channel(NioSocketChannel.class);

bootstrap.group(group);

bootstrap.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));

pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));

pipeline.addLast(nettyClientHandler);

}

});

break;

}

case MESSAGE_CONNECT: {

try {

if (TextUtils.isEmpty(host) || port == 0) {

Exception exception = new Exception("Netty host | port is invalid");

throw exception;

}

channel = bootstrap.connect(new InetSocketAddress(host,

port)).sync().channel();

} catch (Exception e) {

LogUtils.e(TAG, "connect failed " + e.getMessage() + " reconnect delay: " + Constant.DELAY_CONNECT);

ToastUtils.showOnOtherThread("服务器连接失败");

e.printStackTrace();

sendReconnectMessage();

}

break;

}

case MESSAGE_SEND: {

String sendMsg = msg.getData().getString(ACTION_SEND_MSG);

int mt = msg.getData().getInt(ACTION_SEND_TYPE);

try {

if (channel != null && channel.isOpen()) {

channel.writeAndFlush(constructMessage(sendMsg)).sync();

Log.d(TAG, "send succeed " + constructMessage(sendMsg));

} else {

throw new Exception("channel is null | closed");

}

} catch (Exception e) {

LogUtils.e(TAG, "send failed " + e.getMessage());

sendReconnectMessage();

e.printStackTrace();

} finally {

if (Constant.MT.HAND_SHAKE.getType() == mt)

sendMessage(mt, sendMsg, Constant.DELAY_HAND_SHAKE);

}

break;

}

}

return true;

}

};

private NettyClient() {

init();

}

public synchronized static NettyClient getInstance() {

if (mInstance == null)

mInstance = new NettyClient();

return mInstance;

}

private void init() {

workThread = new HandlerThread(NettyClient.class.getName());

workThread.start();

mWorkHandler = new Handler(workThread.getLooper(), mWorkHandlerCallback);

nettyClientHandler = new NettyClientHandler();

nettyClientHandler.setConnectStatusListener(new OnConnectStatusListener() {

@Override

public void onDisconnected() {

sendReconnectMessage();

}

});

mWorkHandler.sendEmptyMessage(MESSAGE_INIT);

}

@Override

public void connect(String host, int port) {

this.host = host;

this.port = port;

mWorkHandler.sendEmptyMessage(MESSAGE_CONNECT);

}

@Override

public void addDataReceiveListener(OnDataReceiveListener listener) {

if (nettyClientHandler != null)

nettyClientHandler.addDataReceiveListener(listener);

}

private void sendReconnectMessage() {

mWorkHandler.sendEmptyMessageDelayed(MESSAGE_CONNECT, Constant.DELAY_CONNECT);

}

@Override

public void sendMessage(int mt, String msg, long delayed) {

if (TextUtils.isEmpty(msg))

return;

Message message = new Message();

Bundle bundle = new Bundle();

message.what = MESSAGE_SEND;

bundle.putString(ACTION_SEND_MSG, msg);

bundle.putInt(ACTION_SEND_TYPE, mt);

message.setData(bundle);

mWorkHandler.sendMessageDelayed(message, delayed);

}

private String constructMessage(String json) {

String message;

//与后台协议好,如何设置校验部分,然后和json一起发给服务器

return message;

}

}

## 5 NettyClient的使用

NettyClient采用了全局单例的模式。

1. 在Activity或者Fragment中为NettyClient绑定相应的数据接收监听器:

NettyClient.getInstance().addDataReceiveListener(new INettyClient.OnDataReceiveListener() {

@Override

public void onDataReceive(int mt, String json) {

//这些逻辑运行在主线程

if (mt == Constant.MSG_TYPE) {

UserModel user=new Gson().fromJson(json, UserModel .class);

textView.setText(user.getName);

}

}

2. 在Activity或者Fragment中发起请求:

```NettyClient.getInstance().sendMessage(Constant.MSG_TYPE, "发向服务器的消息", 0);```

可以看到使用起来十分简单方便。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐