今天中秋节,笔者首先祝各位读者们中秋快乐,之所以在今天这个团圆节来谈分布式的话题,就是要聊聊物联网是如何通过MQTT连接各类终端,如何通过RPC整合各种数据的。下面就通过代码+动图的方式来解读一下华为LiteOS的MQTT与TD的RPC。

MQTT协议

    MQTT是一个客户机服务器发布/订阅消息传输协议。它重量轻、开放、简单、易于实现。这些特性使其非常适合在物联网的低带宽、高延时、不可靠的网络环境下使用,MQTT协议运行在TCP/IP使用发布/订阅消息模式,提供一对多的消息分发。MQTT有三种消息传递服务质量。

    1.最多一次,也就是消息发布者只会发布一次消息,不管对端是否收到也不会发布第二次。一般用于环境传感器的数据读取,因为一般环境传感器读取的密度很高,丢失几个数据并没有什么大问题。·

    2.确保到达,这个一般用在数据非常重要的情况,发送端将不断重复发送直到对端响应收到。但这样可能出现数据重复。

    3.·确保恰好一次送达,确保消息正好到达一次。这个级别用于计费系统,重复或丢失的数据可能导致一定的损失。

    由于MQTT适合在低带宽、高延时网络运行的特性所以在特联网中的应用很多。所以华为liteos做为物联网终端的操作系统会用到MQTT的相关协议来进行LOT环境下的消息传输,其具体的代码在https://github.com/Awesome-HarmonyOS/HarmonyOS/tree/master/Huawei_LiteOS/components/connectivity/mqtt,其实现的原理动图如下:

 

    具体说来基本是这四个过程:

    1.调用MQTTClientInit建立MQTT的客户端,

   2.调用MQTTConnect与发布端建立连接

    3.调用MQTTSubscribe订阅消息,其中接收消息时会调用回调函数处理,当然发布端会按照传递服务质量不同来制订不同的发送策略。

    4.调用MQTTDisconnect断开连接-

    其连接的代码节选如下:

int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)
{
    Timer connect_timer;//设置timer
    int rc = FAILURE;
    MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
    int len = 0;

#if defined(MQTT_TASK)
	  MutexLock(&c->mutex);
#endif
	  if (c->isconnected) /* don't send connect packet again if we are already connected */
		  goto exit;

    TimerInit(&connect_timer);
    TimerCountdownMS(&connect_timer, c->command_timeout_ms);

    if (options == 0)
        options = &default_options; /* set default options if none were supplied */

    c->keepAliveInterval = options->keepAliveInterval;
    c->cleansession = options->cleansession;
    TimerCountdown(&c->last_received, c->keepAliveInterval);
    if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
        goto exit;
    if ((rc = sendPacket(c, len, &connect_timer)) != MQTT_SUCCESS)  // send the connect packet
        goto exit; // there was a problem

    // this will be a blocking call, wait for the connack
    if (waitfor(c, CONNACK, &connect_timer) == CONNACK)//等到timer到期继续
    {
        data->rc = 0;
        data->sessionPresent = 0;
        if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)
            rc = data->rc;
        else
            rc = FAILURE;
    }
    else
        rc = FAILURE;

exit:
    if (rc == MQTT_SUCCESS)
    {
        c->isconnected = 1;
        c->ping_outstanding = 0;
    }

#if defined(MQTT_TASK)
	  MutexUnlock(&c->mutex);
#endif

    return rc;
}


int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
{
    MQTTConnackData data;
    return MQTTConnectWithResults(c, options, &data);
}

 

RPC协议

 

      RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,他是分布式计算的必备要素之一,简单来讲就是消费者直接通过RPC服务调用远程机器上的服务接口来完成计算请求。与MQTT订阅发布的传输模式不同,RPC采用B/S模式。客户端发送一个带参数的调用请求到服务端,服务端收到请求后解析命令,获得参数,计算结果,最后接收应答。

   凭心而论涛思实现的RPC模块在代码可读性上要比华为LITEOS的强不少,其RPC模块主要的代码位置在https://github.com/taosdata/TDengine/blob/master/src/rpc/src/trpc.c。其原理比较一目了然,动画如下图:

主要结构由最外层的控制器(RpcConn)和中层的Rpc_server与最内层的SRpcChann组成。

1.首先客户端先通过自币的控制器发送一个计算请求给对端,

2.服务端收到后先发送一个收到的回应给客户端,

3.服务端经过解析后将计算请求解析后执行

4.服务端将结果发回给客户端。

其主要代码如下:

void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, void *shandle, void *thandle,
                              void *chandle) {
  STaosHeader *pHeader;
  uint8_t      code;
  SRpcConn *   pConn = (SRpcConn *)thandle;
  STaosRpc *   pServer = (STaosRpc *)shandle;
  int          msgLen;
  char         pReply[128];
  SSchedMsg    schedMsg;
  int          chann, sid;

  tDump(data, dataLen);

  if (ip == 0 && taosCloseConn[pServer->type]) {
    // 检查链接状态
    if (pConn) {
      tTrace("%s cid:%d sid:%d id:%s, underlying link is gone pConn:%p", pServer->label, pConn->chann, pConn->sid,
             pConn->meterId, pConn);
      pConn->rspReceived = 1;
      pConn->chandle = NULL;
      schedMsg.fp = taosProcessSchedMsg;
      schedMsg.msg = NULL;
      schedMsg.ahandle = pConn->ahandle;
      schedMsg.thandle = pConn;
      taosScheduleTask(pServer->qhandle, &schedMsg);
    }
    tfree(data);
    return NULL;
  }

  pHeader = (STaosHeader *)data;
  tTrace("%s msg received, len:%d source:0x%08x dest:0x%08x tranId:%d", pServer->label, dataLen, pHeader->sourceId,
         pHeader->destId, pHeader->tranId);
  msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen);

  code = (uint8_t)taosProcessMsgHeader(pHeader, &pConn, pServer, dataLen, ip, port, chandle);

  pHeader->destId = htonl(pHeader->destId);
  chann = pHeader->destId >> pServer->bits;
  sid = pHeader->destId & pServer->mask;

  if (pConn && pServer->idleTime) {
    SRpcChann *pChann = pServer->channList + pConn->chann;
    taosTmrReset(taosProcessIdleTimer, pServer->idleTime, pConn, pChann->tmrCtrl, &pConn->pIdleTimer);//如果设置了空闲timer则把这个timer重置
  }

  if (code == TSDB_CODE_ALREADY_PROCESSED) {
    tTrace("%s cid:%d sid:%d id:%s, %s wont be processed tranId:%d pConn:%p", pServer->label, chann, sid,
           pHeader->meterId, taosMsg[pHeader->msgType], pHeader->tranId, pConn);
    free(data);
    return pConn;
  }

  if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (taosDebugFlag & 16)) {
    tTrace("%s cid:%d sid:%d id:%s, %s received from 0x%x:%hu, parse code:%u, first:%u len:%d tranId:%d pConn:%p",
           pServer->label, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], ip, port, code, pHeader->content[0],
           dataLen, pHeader->tranId, pConn);
  }

  if (code != 0) {
    // 解析错误码

    if (pHeader->msgType & 1) {
      msgLen = taosBuildErrorMsgToPeer(data, code, pReply);
      (*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle);
      tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid,
             pHeader->meterId, taosMsg[pHeader->msgType + 1], code, pConn);
    } else {
      tTrace("%s cid:%d sid:%d id:%s, %s is received, parsing error:%u pConn:%p", pServer->label, chann, sid,
             pHeader->meterId, taosMsg[pHeader->msgType], code, pConn);
    }

    free(data);
  } else {
    //解析正确结果

    // internal communication is based on TAOS protocol, a trick here to make it efficient
    pHeader->msgLen = msgLen - (int)sizeof(STaosHeader) + (int)sizeof(SIntMsg);

    if ((pHeader->msgType & 1) == 0 && (pHeader->content[0] == TSDB_CODE_SESSION_ALREADY_EXIST)) {
      schedMsg.msg = NULL;  // connection shall be closed
    } else {
      schedMsg.msg = (char *)(&(pHeader->destId));
      // memcpy(schedMsg.msg, (char *)(&(pHeader->destId)), pHeader->msgLen);
    }

    if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (taosDebugFlag & 16)) {
      tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p",
             pServer->label, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn,
             pConn->pTimer);
    }

    schedMsg.fp = taosProcessSchedMsg;
    schedMsg.ahandle = pConn->ahandle;
    schedMsg.thandle = pConn;
    taosScheduleTask(pServer->qhandle, &schedMsg);
  }

  return pConn;
}

写在最后:

         物联网时代终端通信一般使用MQTT,它可以通过具体场景来定义消息订阅的质量,比如水表、电表等终端消息不能丢失也不能重复,那么就要选择对于质量要求最高的“保证一次送达”;再比如气象传感器类的终端会不断发布最新数据,那么其对于数据是否重复与是否送达都不敏感,那么就可以选择质量要求最低的只发一次,通过这样的方式来节约资源满足各类终端的通讯需求。而RPC的重点则在于分布式计算,PEER端的服务器在收到指令后可以按照Server端的要求执行计算任务,并返回结果。

 

 

更多推荐