前言

上一篇提到了MQTT的通用方式,由于智能家居TV的项目网络波动频繁,通用的方式已经无法满足需求,经常会出现重复订阅导致收到多条消息,那就只能另辟蹊径了,最终找到了梦寐以求的MqttAndroidClient。

1.集成

集成方式和上一篇的MQTT简介和使用要新增配置,build.gradle新增

 

implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

manifest文件里面需要注册服务

 

<!-- Mqtt Service -->
        <service android:name="org.eclipse.paho.android.service.MqttService" />

2.MqttAndroidClient重要源码解析

MqttAndroidClient是专门对MQTTClient的再封装拓展类,包含了订阅、连接以及多线程的处理,直接看MqttAndroidClient对于连接的封装源码,非关键代码已省略

 

    public IMqttToken connect(MqttConnectOptions options, Object userContext,
            IMqttActionListener callback) throws MqttException {
        if (mqttService == null) { 
        }
        else {
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    doConnect();
                }
            });
        }
        return token;
    }

doConnect()连接操作放在子线程,有效避免网络波动连接时间过长阻塞主线程

 

private void doConnect() {
        ...
            mqttService.connect(clientHandle, connectOptions, null,
                    activityToken);
        ...
    }

阿里专门针对安卓客户端写了一个MQTTService,方便统一管理,除了连接操作,重连,断开连接都是在MQTTService中完成。

 

public void connect(String clientHandle, MqttConnectOptions connectOptions,
      String invocationContext, String activityToken)throws MqttSecurityException, MqttException {
        MqttConnection client = getConnection(clientHandle);
        client.connect(connectOptions, null, activityToken);
  }

 

public void connect(MqttConnectOptions options, String invocationContext,
            String activityToken) {
            ...
            if (myClient != null) {
                if (isConnecting ) {
                }else if(!disconnected){
                }
                else {                  
                    service.traceDebug(TAG, "myClient != null and the client is not connected");
                    service.traceDebug(TAG,"Do Real connect!");
                    setConnectingState(true);
                    myClient.connect(connectOptions, invocationContext, listener);
                }
            }
            ...
    }

 

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)throws MqttException, MqttSecurityException {
        final String methodName = "connect";
        if (comms.isConnected()) {
            throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
        }
        if (comms.isConnecting()) {
            throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
        }
        if (comms.isDisconnecting()) {
            throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
        }
        if (comms.isClosed()) {
            throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
        }
        ...
        connectActionListener.connect();
        return userToken;
    }

源码自身对于isConnected、isConnecting、isDisconnecting、isClosed做了异常处理,避免正在连接或者断开连接时连接造成重复连接。后面的源码就没贴的必要了,就是开启一个连接线程。

3.MqttAndroidClient的使用

一行代码完成MQTT的连接

 

mqttAndroidClient.connect(mqttConnectOptions, null, iMqttActionListener);

当然,这样是远远不够的,在实际应用中发现有一个问题,断网一段时间后重连网络MQTT不会自动重连,所以还得我们来做手动优化。思路很简单,在断开的节点开启重连线程,连接成功后关闭重连线程。以下贴的是完整代码,主要注意重连机制和订阅前记得取消订阅再订阅。(部分包含自己的代码,可以忽略,注释很详细)

 

public class MQTTManager {

    private Context mContext;
    private MqttAndroidClient mqttAndroidClient;
    private String clientId;//自定义

    private MqttConnectOptions mqttConnectOptions;

    private ScheduledExecutorService reconnectPool;//重连线程池

    public MQTTManager(Context mContext) {
        this.mContext = mContext;
    }

    public void buildClient() {
        closeMQTT();//先关闭上一个连接

        buildMQTTClient();
    }

    private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken asyncActionToken) {
            TVLog.i("connect-"+"onSuccess");
            closeReconnectTask();
            subscribeToTopic();
        }

        @Override
        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
            //connect-onFailure-MqttException (0) - java.net.UnknownHostException
            TVLog.i("connect-"+ "onFailure-"+exception);
            startReconnectTask();
        }
    };

    private MqttCallback mqttCallback = new MqttCallback() {
        @Override
        public void connectionLost(Throwable cause) {
            //close-connectionLost-等待来自服务器的响应时超时 (32000)
            //close-connectionLost-已断开连接 (32109)
            TVLog.i("close-"+"connectionLost-"+cause);
            if (cause != null) {//null表示被关闭
                startReconnectTask();
            }
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            String body = new String(message.getPayload());
            TVLog.i("messageArrived-"+message.getId()+"-"+body);
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            try {
                TVLog.i("deliveryComplete-"+token.getMessage().toString());
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    };

    private void buildMQTTClient(){
        mqttAndroidClient = new MqttAndroidClient(mContext, MQTTCons.Broker, clientId);
        mqttAndroidClient.setCallback(mqttCallback);

        mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setKeepAliveInterval(20);
        mqttConnectOptions.setCleanSession(true);
        try {
            mqttConnectOptions.setUserName("Signature|" + MQTTCons.AcessKey + "|" + MQTTCons.instanceId);
            mqttConnectOptions.setPassword(MacSignature.macAndSignature(clientId, MQTTCons.SecretKey).toCharArray());
        } catch (Exception e) {
        }
        doClientConnection();
    }

    private synchronized void startReconnectTask(){
        if (reconnectPool != null)return;
        reconnectPool = Executors.newScheduledThreadPool(1);
        reconnectPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                doClientConnection();
            }
        } , 0 , 5*1000 , TimeUnit.MILLISECONDS);
    }

    private synchronized void closeReconnectTask(){
        if (reconnectPool != null) {
            reconnectPool.shutdownNow();
            reconnectPool = null;
        }
    }

    /**
     * 连接MQTT服务器
     */
    private synchronized void doClientConnection() {
        if (!mqttAndroidClient.isConnected()) {
            try {
                mqttAndroidClient.connect(mqttConnectOptions, null, iMqttActionListener);
                TVLog.d("mqttAndroidClient-connecting-"+mqttAndroidClient.getClientId());
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    private void subscribeToTopic() {//订阅之前会取消订阅,避免重连导致重复订阅
        try {
            String registerTopic = "";//自定义
            String controlTopic = "";//自定义
            String[] topicFilter=new String[]{registerTopic , controlTopic };
            int[] qos={0,0};
            mqttAndroidClient.unsubscribe(topicFilter, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    TVLog.i("unsubscribe-"+"success");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    TVLog.i("unsubscribe-"+"failed-"+exception);
                }
            });
            mqttAndroidClient.subscribe(topicFilter, qos, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {//订阅成功
                    TVLog.i("subscribe-"+"success");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
//                    startReconnectTask();
                    TVLog.i("subscribe-"+"failed-"+exception);
                }
            });

        } catch (MqttException ex) {
        }
    }

    public void sendMQTT(String topicSep, String msg) {
        try {
            if (mqttAndroidClient == null)return;
            MqttMessage message = new MqttMessage();
            message.setPayload(msg.getBytes());
            String topic = "";//自定义
            mqttAndroidClient.publish(topic, message, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
//                    TVLog.i("sendMQTT-"+"success:" + msg);
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
//                    startReconnectTask();
                    TVLog.i("sendMQTT-"+"failed:" + msg);
                }
            });
        } catch (MqttException e) {
        }
    }

    public void closeMQTT(){
        closeReconnectTask();
        if (mqttAndroidClient != null){
            try {
                mqttAndroidClient.unregisterResources();
                mqttAndroidClient.disconnect();
                TVLog.i("closeMQTT-"+mqttAndroidClient.getClientId());
                mqttAndroidClient = null;
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

}

调用方式如下,想要做线程安全的单例的可以自己封装

 

if (mqttManager == null)
            mqttManager = new MQTTManager(getApplicationContext());
mqttManager.buildClientId();

结语

基于安卓部分也算是完结了,里面也夹杂着一些源码解释,后续会写更多关于源码的解析。



转载:https://www.jianshu.com/p/2857419d14b0

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐