本文在前一篇实现了STM32平台利用MQTT协议对接domoticz平台控制LED灯的基础上,完善了网络连接和MQTT的broker连接过程,实现了断网重连功能。

实现部分主要在rt-thread\components\external\paho-mqtt\MQTTClient-C\samples\domoticz\DomoticzThread.c中。
在前一篇的源码基础上稍作改动,如下所示:

DomoticzThread.c:

#include <rtthread.h>
#include "MQTTClient.h"
//#include "led.h"
#include "DomoticzMessageParser.h"
#include "LED0.h"
#include "LED1.h"
#include "HardwareControl.h"

//#define __DEBUG
#include "dprintf.h"

struct opts_struct
{
	char* clientid;
	int nodelimiter;
	char* delimiter;
	enum QoS qos;
	char* username;
	char* password;
	char* host;
	int port;
	int showtopics;
} opts =
{
	(char*)"subscriber on STM32", 0, (char*)"\n", QOS2, NULL, NULL, (char*)"192.168.1.230", 1883, 0
};

int is_over;

void quit_domoticz_thread(void)
{
	is_over = 1;
}

//================== Added 2017-Apr-27 8:06:53 start ==================
//处理连接的状态机结构
typedef struct 
{
	enum{
		UNCONNECTED = 0,
		NETWORK_CONNECTED=1,
		MQTT_CONNECTED=2,
		SUBSCRIBING_SUCCESS=3,
		WAIT_TIME_OUT=4,
		//SUBSCRIBING_FAILURE = 4
	}state;

	int timeout_s;//超时时间,单位为秒
	int times_count;//累计连续尝试连接次数,在连接成功后清零
}Connection_t;


void connect_time_out(void * data)
{
	Connection_t* con =(Connection_t*)data;
	
	if(con && con->timeout_s>0)
	{
		con->timeout_s --;
	}
}

#define NETWORK_CONNECT_TIMEOUT 5 //5s
#define MAX_NETWORK_CONNECT_TIMES 5
#define MQTT_CONNECT_TIMEOUT 1 //1s
#define MAX_MQTT_CONNECT_TIMES 5
#define SUBSCRIB_TIMEOUT 1 //1s
#define MAX_SUBSCRIB_TIMES 5
#define MAX_NO_PING_RESPONS_TIMES 10

Connection_t connection={UNCONNECTED,NETWORK_CONNECT_TIMEOUT,0};

//================== Added 2017-Apr-27 8:06:53  end ===================

void messageArrived(MessageData* md)
{
	MQTTMessage* message = md->message;

#if 0 /* Commented @ 2017-Apr-23 1:18:29 */
	if (opts.showtopics)
		rt_kprintf("%.*s\t", md->topicName->lenstring.len, md->topicName->lenstring.data);
	if (opts.nodelimiter)
		rt_kprintf("%.*s", (int)message->payloadlen, (char*)message->payload);
	else
		rt_kprintf("%.*s%s", (int)message->payloadlen, (char*)message->payload, opts.delimiter);
#endif /* Commented */
	dprintf("payloadlen=%d,%s\n",(int)message->payloadlen,(char*)message->payload);
	dprintf("strlen(payload)=%d\n",strlen((char*)message->payload));	
	//fflush(stdout);
	((char*)message->payload)[(int)message->payloadlen]=0;
	ParseDomoticzMessage((char*)message->payload);
	
	//dprintf("MSG: qos %d, retained %d, dup %d, packetid %d\n", message->qos, message->retained, message->dup, message->id);
}

void set_host(char *host)
{
	opts.host = host;
}

#define MAX_BUF_SIZE  512

void domoticz_thread_entry(void* parameter)
{
	int rc = 0;
	unsigned char buf[MAX_BUF_SIZE]={0};//buf[100];
	unsigned char readbuf[MAX_BUF_SIZE]={0};//readbuf[100];
	
	char* topic = "domoticz/out";

	Network n;
	MQTTClient c;
	
	MQTTPacket_connectData data = MQTTPacket_connectData_initializer;  
	
	rt_timer_t timer = rt_timer_create("connect_timer", connect_time_out, &connection, RT_TICK_PER_SECOND, RT_TIMER_FLAG_PERIODIC);
	is_over = 0;
	rt_kprintf("domoticz_thread_entry\n");

//================== Added 2017-Apr-26 2:36:53 start ==================
	RegisterHardware(Create_LED0(),1);
	RegisterHardware(Create_LED1(),6);
	OpenHardwares();
	SetupDomoitczMessageParser();	
	SetEnableParseItem(KEY_IDX);
	SetEnableParseItem(KEY_NVALUE);
	SetEnableParseItem(KEY_SWITCH_TYPE);
	initHardwareSettings();
//================== Added 2017-Apr-26 2:36:53  end ===================
	 
	data.willFlag = 0;
	data.MQTTVersion = 3;
	data.clientID.cstring = opts.clientid;
	data.username.cstring = opts.username;
	data.password.cstring = opts.password;

	data.keepAliveInterval = 10;
	data.cleansession = 0;
	rt_timer_start(timer);
    while(!is_over)
    {   
        switch(connection.state)
        {
        case UNCONNECTED:
						//dprintf("\n");
            if(connection.timeout_s>0)
                continue;
						dprintf("state = UNCONNECTED\n");
						rt_kprintf("Connecting to %s:%d\n", opts.host, opts.port);
						NetworkInit(&n);
            rc = NetworkConnect(&n, opts.host, opts.port);
						dprintf("rc=%d\n",rc);
            if(rc==SUCCESS)
            {//socket ????
                connection.state = NETWORK_CONNECTED;
                connection.times_count = 0;
								rt_kprintf("NetworkConnect ok!\n");
                //MQTTClientInit(&c, &n, 1000, buf, sizeof(buf), readbuf, sizeof(readbuf));
                MQTTClientInit(&c, &n, 1000, buf,MAX_BUF_SIZE, readbuf, MAX_BUF_SIZE);
            }
            else if(connection.times_count <MAX_NETWORK_CONNECT_TIMES)
            {dprintf("\n");
                connection.times_count++;
                connection.timeout_s = NETWORK_CONNECT_TIMEOUT;
            }
            else
            {//reboot system,restart
							dprintf("\n");
              connection.times_count=0;
							connection.timeout_s = NETWORK_CONNECT_TIMEOUT*6;
							rt_kprintf("we will reconnect after %d senconds\n",connection.timeout_s);
                //usleep(30*1000000);//30????
                //goto exit;
            }
            break;
        case NETWORK_CONNECTED: 
            if(connection.timeout_s>0)
                continue;
						dprintf("\n");
						rt_kprintf("state = NETWORK_CONNECTED\n");
            rc = MQTTConnect(&c, &data);
            dprintf("rc=%d\n",rc);
            if(rc == SUCCESS)
            {
                connection.state = MQTT_CONNECTED;
                connection.times_count = 0;
								connection.timeout_s = 0;
                //printf("MQTTConnected!\n");
								rt_kprintf("MQTTConnected! Subscribing to %s\n", topic); 
								
            }
            else if(connection.times_count <MAX_MQTT_CONNECT_TIMES)
            {
              //printf("MQTTConnect times=%d, err:%d! \n",connection.times_count,rc);
							rt_kprintf("MQTTConnect times=%d, err:%d!\n",connection.times_count,rc);
              connection.times_count++;
              connection.timeout_s = MQTT_CONNECT_TIMEOUT;
            }
            else
            {//????network??
                dprintf("\n");
                NetworkDisconnect(&n);
                connection.state = UNCONNECTED;
                connection.times_count=0;
								connection.timeout_s = NETWORK_CONNECT_TIMEOUT*6;
								rt_kprintf("we will reconnect after %d senconds\n",connection.timeout_s);
            }
            break;
        case MQTT_CONNECTED:		
					if(connection.timeout_s>0)
						continue;
					dprintf("state = MQTT_CONNECTED\n");
          rc = MQTTSubscribe(&c, topic, opts.qos, messageArrived);
					dprintf("rc=%d\n",rc);
          if(rc == SUCCESS)
					{
							rt_kprintf("Subscribed %s\n", topic);
							connection.state = SUBSCRIBING_SUCCESS;
							connection.times_count = 0;
					}
					else if(connection.times_count <MAX_SUBSCRIB_TIMES && MQTTIsConnected(&c)==1)
					{
						
							rt_kprintf("MQTTSubscribe times=%d, err:%d! \n",connection.times_count,rc);
							connection.times_count++;
							connection.timeout_s = MQTT_CONNECT_TIMEOUT;
					}
					else
					{
							if(MQTTIsConnected(&c)==1)
								MQTTDisconnect(&c);
							else
								MQTTCleanSession(&c);
							NetworkDisconnect(&n);
							connection.state = UNCONNECTED;
							connection.times_count=0;
							connection.timeout_s = NETWORK_CONNECT_TIMEOUT*6;
							rt_kprintf("we will reconnect after %d senconds\n",connection.timeout_s);
					}
           break;
        case SUBSCRIBING_SUCCESS:
            MQTTYield(&c, 1000);
            if(c.ping_timeout_times>=MAX_NO_PING_RESPONS_TIMES+1 || MQTTIsConnected(&c)==0)			
            {              
                if(MQTTIsConnected(&c)==1)
               		MQTTDisconnect(&c);
								else
									MQTTCleanSession(&c);
                NetworkDisconnect(&n);
                connection.state = UNCONNECTED;
                connection.times_count = 0;
								connection.timeout_s = NETWORK_CONNECT_TIMEOUT*6;
								rt_kprintf("we will reconnect after %d senconds\n",connection.timeout_s);
                
            }
            else
            {
                connection.times_count = 0;
            }
            break;
        default:
						rt_kprintf("satte = default,err!\n");
            break;
        }

    }
	
exit:
	rt_kprintf("Stopping\n");	
	MQTTDisconnect(&c);
	NetworkDisconnect(&n);	
	rt_timer_delete(timer);

}

void domoticz_thread_init(void)
{
	rt_thread_t domoticz_thread;
	domoticz_thread = rt_thread_create("DomoticzThread", domoticz_thread_entry, RT_NULL,
	0xf00, 28, 10);
	if (domoticz_thread != RT_NULL)
		rt_thread_startup(domoticz_thread);

}

#ifdef RT_USING_FINSH
#include <finsh.h>
FINSH_FUNCTION_EXPORT(set_host, set domoticz host ip addr);
FINSH_FUNCTION_EXPORT(domoticz_thread_init,to run domoticz thread );
FINSH_FUNCTION_EXPORT(quit_domoticz_thread,quit domoticz thread );
#endif


运行效果:
这里写图片描述

这里写图片描述

domoticz这边建立三个LED灯,idx分别为1,2,3

STM32开发板上的LED0的idx为2,LED1的idx为3。
也就是domoticz平台上的“LED灯2”对应开发板LED0;
domoticz平台上的“LED灯3”对应开发板LED1。

这里写图片描述

这里写图片描述

好了,现在在STM32开发板上看效果:
这里写图片描述

1、LED0亮:

这里写图片描述
这里写图片描述

2、LED1亮:

这里写图片描述
这里写图片描述

3、LED0、LED1都亮:

这里写图片描述
这里写图片描述

4、在mini2440板上运行前面做的mqtt客户端程序。

mini2440上也控制两个灯LED0,对应的idx为1;LED1,对应idx为2。
也就是说mini2440板上的LED1和STM32板上的LED0的idx是相同的,看看能不能同时点亮。

这里写图片描述

点亮mini2440的LED0:
这里写图片描述
这里写图片描述

下面将看到在domoticz上点亮LED2灯可以同时点亮mini2440上的LED1和STM32上的LED0:
这里写图片描述

这里写图片描述

都点亮:
这里写图片描述
这里写图片描述

5、Android手机实际控制效果:
这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

6、当MQTT的broker被关闭并重新打开后,STM32板的反应:

这里写图片描述

在程序中,其实只要ping 10次没有结果,就会断开MQTT连接和socket连接,并重新连接。
如果有网络错误,连接不上会,延时一段时间,然后重新连接socket,然后连MQTT的broker,然后订阅。
具体如上述源码中处理那样。

附:STM32开发板的整个源码工程下载地址:
https://download.csdn.net/download/sqshining/11076319

Logo

更多推荐