基于java的多设备类型物联网架构实现

前言:19年11月开始从 【金融】行业转 【物联网】,路途坎坷,一个人摸索前进,不过也学到了很多新的东西,交了很多好朋友,在此感谢各位!
以下是一些经验分享,希望能帮到有需要的朋友。

1、架构思路

考虑了很久打算用springboot + mysql 去实现,因为熟悉这个框架,而且能减轻70%的机械性开发工作量,以后改springcloud也方便(注意逻辑实现不然工作量很大)。

物联网和互联网可以说是有共同点的,但是也有很多的不一样。

先说协议,互联网很多都是https或者http,但是物联网这块就不仅仅是这两种协议,会有UDP协议,TCP协议。

上干货:

环境:java+mysql+redis+rabbitMQ+Mqtt

图解:
在这里插入图片描述
这个是比较简单的逻辑图,里面的复杂逻辑还是不能说的。

这里面涉及到几个问题,好多物联网设备终端会有心跳,事件数据上来,怎么保证并发?入库的数据唯一?多种设备的数据上行,怎么存储?怎么管理?有多个第三方服务怎么分发数据?

咱们一点点说:

1、根据协议解析数据

终端上行的数据有433协议,有蓝牙的,有zigbee的,还有tcp、http的,还分1代、2代、3代等协议。我用了一个笨方法:根据协议的不同解析,存入不同的表

我整了个枚举类来存放不同的类型,在解析方法里面通过唯一性的一段上行数据去区分(为什么这么做?以为上行数据是16进制的)

HH0F22AEBB8200011100020001BB23AABB8637

“HH0F”就是A协议的特有字符,那这个就入A协议库。

A协议是A设备专用的,那建表就是这样的

A_tag    —————— 设备表
A_data    ——————设备数据表
A_gateway    ——————网关表

那么问题来了,要是10种设备不得30个表了?
做过物联网的都知道,不同类型的设备可能带的属性都是不一样的,有的设备可能就3个:
电量、包序、特征值
但是有的设备可能就不止了,比方说:
电量、包序、心跳、呼吸、体温、动态值。。。。
data表中数据不得爆掉?——我深思熟虑也只是想到通用字段存储,data1~data20

多设备类型的暂时解决了,后面怎么具体操作呢?

2、怎么保证数据不会重复并快速入库

上图我采用了两个服务,一个接受服务,一个处理服务,具体处理方法:
接受服务将受到的数据转base64编码后直接发到消息队列(rabbitMQ),处理服务监听rabbitMQ消息,走解析服务。就是这么简单!
但是,有一点,rabbitMQ要开启ack模式!!!而且可以用负载来做这步!

问题二:设备一多,并发问题就来了,怎么搞?
Semaphore 这可是个好东西

如果来一条数据就insert,哪个数据库都受不了,后来采用批量方式插入:

// ----------------------数据批量入库开始-------------------------
	private static List<NnData> listNnObjectSaveDO = new ArrayList<NnData>();
	private static Long startTimeNnData = 0L;
  //private static int countSend = 0;
	private static Semaphore semaphoreNnData = new Semaphore(1);
	@Override
	public void saveDO(NnData nnData) {
		try {
			semaphoreNnData.acquire();
			if (startTimeNnData == 0L) {
				startTimeNnData = System.currentTimeMillis();
			}
			listNnObjectSaveDO.add(nnData);
			if (System.currentTimeMillis() - startTimeNnData > 2000) {
				List<NnData> listSaveDOOne = new ArrayList<NnData>();
				listSaveDOOne.addAll(listNnObjectSaveDO);
				asyncInsertBatch(listSaveDOOne);
				listNnObjectSaveDO.clear();
				startTimeNnData = 0L;
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			semaphoreNnData.release();
		}
	}
	@Async
	private void asyncInsertBatch(List<NnData> list) {
		this.insertBatch(list);
      //countSend = countSend + list.size();
      //System.out.println("=================入库数据条数:" + countSend);
	}
	// ----------------------数据批量入库结束-------------------------

我做过测试,这样的方式远超2000条/秒,有眼尖的朋友看出来了:

System.currentTimeMillis() - startTimeNnData > 2000  //2秒批量入库一次

暂时就到这儿,后续想到我再补充!

更多推荐