PMS7003采集

整体介绍

树莓派+PMS7003+1.44寸TFT屏幕
20秒采集一次数据
60秒钟上传一次心跳
语言:python
接口:webapi post

PMS7003传感器介绍

Dust Sensor - PMS 5003/6003/7003: 数据链接.
PMS 7003: github链接.

数据位介绍

以下是PMS7003的数据位。
在这里插入图片描述在这里插入图片描述在这里插入图片描述

采集方法

由于PMS7003的传输过来数据需要先转换成16进制 才能被程序计算,所以前置步骤,需要进行转换。

# 字符转换16 进制
def hexShowNew(argv):  
    try:
        result = ''
        hLen = len(argv)
        for i in range(hLen):
            hvol = argv[i]
            hhex = '%02x' % hvol
            result += hhex + ' '
        # print(result)
        return result
    except:
        pass

同时数据传输过来的话可能异常数据,所以一定要计算校验位。
一共32个数据,前30个数据想加的值,是否等于后2个数据的值。
我这边图方便,全部转换成10进制数据来计算。

 for index in range(0,29) :
       checksum += int(strarr[index],16)

其中这些数据是我们要的:

pm1point0 = strarr[4] + strarr[5]#pm1.0
pm2point5 = strarr[6] + strarr[7]#pm2.5
pm10 = strarr[8] + strarr[9]#pm10
pm1point0air = strarr[10] + strarr[11]#pm1.0
pm2point5air = strarr[12] + strarr[13]#pm2.5
pm10air = strarr[14] + strarr[15]#pm10
um0point3 = strarr[16] + strarr[17]#0.3um颗粒
um0point5 = strarr[18] + strarr[19]#0.5um颗粒
um1point0 = strarr[20] + strarr[21#1.0um颗粒
um2point5 = strarr[22] + strarr[23]#2.5um颗粒
um5point0 = strarr[24] + strarr[25]#5.0um颗粒
um10 = strarr[26] + strarr[27]#10um颗粒

以下是程序源码:

# -*- coding: utf-8 -*
# python -u 启动,禁用stdout缓冲功能,不然打印有延时。
import serial
import time
import sys
import socket
import json
import requests

# 
class ParticleOutputModel:
    hostname = ''
    uuid = ''
    machinetype = '' 
    flag = ''
    pm1point0 = 0
    pm2point5 = 0 
    pm10 = 0
    pm1point0air = 0 
    pm2point5air = 0 
    pm10air = 0 
    over0point3um = 0 
    over0point5um = 0 
    over1point0um = 0 
    over2point5um = 0 
    over5point0um = 0 
    over10um = 0

    def __init__(self,hostname,uuid,machinetype,flag,pm1point0,pm2point5,pm10,pm1point0air,pm2point5air,pm10air,over0point3um,over0point5um,over1point0um,over2point5um,over5point0um,over10um):
        self.hostname = hostname
        self.uuid = uuid
        self.machinetype = machinetype 
        self.flag = flag
        self.pm1point0 = pm1point0
        self.pm2point5 = pm2point5 
        self.pm10 = pm10
        self.pm1point0air = pm1point0air 
        self.pm2point5air = pm2point5air 
        self.pm10air = pm10air 
        self.over0point3um = over0point3um 
        self.over0point5um = over0point5um 
        self.over1point0um = over1point0um 
        self.over2point5um = over2point5um 
        self.over5point0um = over5point0um 
        self.over10um = over10um

# 打开串口
ser = serial.Serial("/dev/ttyAMA0", 9600)

def is_open(filename):
 
    try:
        vHandle =win32file.CreateFile(filename, GENERIC_READ, 0, None, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, None)
        if int(vHandle)==INVALID_HANDLE_VALUE:
            print("# file is already open")
            return True  # file is already open
        win32file.CloseHandle(vHandle)
 
    except Exception as e:
        print(e)
        return True

# 16 进制
def hexShowNew(argv): 
    try:
        result = ''
        hLen = len(argv)
        for i in range(hLen):
            hvol = argv[i]
            hhex = '%02x' % hvol
            result += hhex + ' '
        # print(result)
        return result
    except:
        pass

# main方法:
def main():
    host = 'localhost'
    port = 7002
    while True:
        try:
            print("create connection success")

            while True:
                count = ser.inWaiting()
                if count > 30:
                    # 读取内容并回显
                    recv = ser.read(count)
                    strvalue = hexShowNew(recv)
                    strarr = strvalue.split()

                    if (len(strarr) == 32):
                        print("################################")
                    print("load data success")

                    checksum=0
                    for index in range(0,29) :
                        checksum += int(strarr[index],16)
                    
                    print("checksum")
                    print(checksum)

                    print("systemchecksum")
                    syschecksum = int(str(strarr[30])+str(strarr[31]),16)
                    print(syschecksum)

                    if checksum == syschecksum:
                        print("checksum is success")

                        pm1point0 = strarr[4] + strarr[5]
                        pm2point5 = strarr[6] + strarr[7]
                        pm10 = strarr[8] + strarr[9]
                        pm1point0air = strarr[10] + strarr[11]
                        pm2point5air = strarr[12] + strarr[13]
                        pm10air = strarr[14] + strarr[15]
                        um0point3 = strarr[16] + strarr[17]
                        um0point5 = strarr[18] + strarr[19]
                        um1point0 = strarr[20] + strarr[21]
                        um2point5 = strarr[22] + strarr[23]
                        um5point0 = strarr[24] + strarr[25]
                        um10 = strarr[26] + strarr[27]
                 
                        resultstr = "<ParticleDetection><Flag>[Flag]</Flag><PM1.0>[PM1.0]</PM1.0><PM2.5>[PM2.5]</PM2.5><PM10>[PM10]</PM10><PM1.0air>[PM1.0air]</PM1.0air><PM2.5air>[PM2.5air]</PM2.5air><PM10air>[PM10air]</PM10air><over0.3um>[over0.3um]</over0.3um><over0.5um>[over0.5um]</over0.5um><over1.0um>[over1.0um]</over1.0um><over2.5um>[over2.5um]</over2.5um><over5.0um>[over5.0um]</over5.0um><over10um>[over10um]</over10um></ParticleDetection>"
                        resultstr = resultstr.replace("[Flag]", "success")
                        resultstr = resultstr.replace("[PM1.0]", str(int(pm1point0, 16)))
                        resultstr = resultstr.replace("[PM2.5]", str(int(pm2point5, 16)))
                        resultstr = resultstr.replace("[PM10]", str(int(pm10, 16)))
                        resultstr = resultstr.replace("[PM1.0air]", str(int(pm1point0air, 16)))
                        resultstr = resultstr.replace("[PM2.5air]", str(int(pm2point5air, 16)))
                        resultstr = resultstr.replace("[PM10air]", str(int(pm10air, 16)))
                        resultstr = resultstr.replace("[over0.3um]", str(int(um0point3, 16)))
                        resultstr = resultstr.replace("[over0.5um]", str(int(um0point5, 16)))
                        resultstr = resultstr.replace("[over1.0um]", str(int(um1point0, 16)))
                        resultstr = resultstr.replace("[over2.5um]", str(int(um2point5, 16)))
                        resultstr = resultstr.replace("[over5.0um]", str(int(um5point0, 16)))
                        resultstr = resultstr.replace("[over10um]", str(int(um10, 16)))
                        
                        
                        x = ParticleOutputModel("raspberry","sssss","machinetype","success",int(pm1point0, 16),int(pm2point5, 16),int(pm10, 16),int(pm1point0air, 16),int(pm2point5air, 16),int(pm10air, 16),int(um0point3, 16),int(um0point5, 16),int(um1point0, 16),int(um2point5, 16),int(um5point0, 16),int(um10, 16))
                        #对象序列化
                        x= x.__dict__
                        json_str = json.dumps(x,ensure_ascii=False,indent=4)
                        
                        try:
                            with open('./data.json', 'w') as file_object:
                                file_object.write(json_str)
                        except:
                            print('error:') 
                        
                        headers = {'Content-Type': 'application/json'}
                        r = requests.post("http接口", data=json_str, headers=headers,timeout=5)
                        print("send success")
                        time.sleep(20)
                    else:
                        time.sleep(2)
                        print("checksum is error")

                else:
                    print("load data fail, no transaction")
                    ser.flushInput()
                    time.sleep(0.5)
                
        except:
            print("exception")
            time.sleep(20)


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        if ser != None:
            ser.close()



最后会先保存一份json文件在本地,同时调用Post接口传到接口,存到数据库中。
有另一个程序会把存在本地的json文件,读取出来,并显示在1.44寸的TFT屏幕界面上。

点击阅读全文
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐