工业物联网系统既要采集数据,还要处理、缓存、组合、存储。这里解析油井远程监控系统的架构,重点是两级缓存机制和存储命令引擎的实现。内容源自实际项目,适合工业数据采集、物联网设备等应用场景。


一、系统整体架构

1.1 架构概览

在这里插入图片描述

1.2 核心组件

组件 职责 关键类
通信层 GPRS 设备接入、数据收发 ICommunication
协议解析层 原始数据解析为监控项 IAnalyticalMoudbs
缓存层 响应数据缓存、线程安全 ResponseData 字典
存储命令引擎 多命令数据组合、SQL 生成 Comm.Server
内存处理层 SQL 执行、数据库存储 MemorysHandler
配置管理 12 张配置表驱动 FormConfig

二、两级缓存机制(核心)

2.1 为什么需要两级缓存

油井监控里,一个完整业务数据(比如示功图)通常需要多个监控命令组合:

MCYJ307 → 采集点数(约 600 个点)
MCYJ311 → 载荷包 1(约 200 个点)
MCYJ312 → 载荷包 2(约 200 个点)
MCYJ313 → 载荷包 3(约 200 个点)
MCYJ314 → 位移包 1(约 200 个点)
MCYJ315 → 位移包 2(约 200 个点)
MCYJ316 → 位移包 3(约 200 个点)

这些命令异步返回,时间可能差几秒。要把数据完整组合起来,用两级缓存。

2.2 第一级缓存:ResponseData

数据结构
// HaoPuServer.cs
public Dictionary<string, Dictionary<string, string>> ResponseData;
private readonly object responseDataLock = new object();

缓存键设计:DeviceId + ";" + MonitorId

"CYJ0005;MCYJ311" → { "ResponseTime": "2024-01-15 10:30:00", "Z1": "7116,7052,6860,..." }
"CYJ0005;MCYJ312" → { "ResponseTime": "2024-01-15 10:30:02", "Z1": "6980,6890,6750,..." }
"CYJ0005;MCYJ313" → { "ResponseTime": "2024-01-15 10:30:04", "Z1": "5800,5720,5640,..." }
缓存写入逻辑
private void CacheData(string deviceId, string monitorID, Dictionary<string, string> data, DateTime responsetime)
{
    string key = deviceId + ";" + monitorID;
    Dictionary<string, string> cachedData = new Dictionary<string, string>();
    cachedData.Add("ResponseTime", responsetime.ToString("yyyy-MM-dd HH:mm:ss"));
    
    foreach (KeyValuePair<string, string> item in data)
    {
        cachedData[item.Key] = data[item.Key];
    }
    
    // 线程安全写入
    lock (this.responseDataLock)
    {
        if (this.ResponseData.ContainsKey(key))
        {
            this.ResponseData[key] = cachedData;  // 更新
        }
        else
        {
            this.ResponseData.Add(key, cachedData);  // 新增
        }
    }
    
    // 传递给第二级缓存
    CacheDATA state = new CacheDATA(deviceId, monitorID, responsetime, threadData);
    ThreadPool.QueueUserWorkItem(new WaitCallback(this.servercomm.CacheData), state);
}
线程安全设计

CacheData 通过 ThreadPool.QueueUserWorkItem 被多线程调用,多个线程可能同时执行 ContainsKey/Add/修改 操作。用 lock (this.responseDataLock) 保护字典操作,避免 “Collection was modified” 异常。

2.3 第二级缓存:Comm.Server.data

数据结构
// Comm.Server.cs
private Dictionary<string, CacheDATA> data;

缓存键设计和第一级一样,DeviceId + ";" + MonitorId

缓存写入 + 组合逻辑
public void CacheData(object e)
{
    CacheDATA cacheDATA = (CacheDATA)e;
    lock (this.data)
    {
        // 1. 写入当前监控命令数据
        string cacheKey = cacheDATA.DeviceId + ";" + cacheDATA.MonitorId;
        if (this.data.ContainsKey(cacheKey))
        {
            this.data[cacheKey].ResponseTime = cacheDATA.ResponseTime;
            this.data[cacheKey].Data = cacheDATA.Data;
        }
        else
        {
            this.data.Add(cacheKey, cacheDATA);
        }
        
        // 2. 检查存储命令的所有子命令是否都就绪
        foreach (StorageMonitor storageMonitor in enabledSM)
        {
            if (storageMonitor.Monitor.Contains(cacheKey))
            {
                bool allMonitorsReady = true;
                foreach (string monitorKey in storageMonitor.Monitor)
                {
                    // 检查数据是否存在
                    if (!this.data.ContainsKey(monitorKey))
                    {
                        allMonitorsReady = false;
                        break;
                    }
                    // 检查时间差是否在 Period 范围内
                    if (this.DiffDate(this.data[monitorKey].ResponseTime, cacheDATA.ResponseTime) > storageMonitor.Period)
                    {
                        allMonitorsReady = false;
                        break;
                    }
                }
                
                // 3. 所有子命令数据就绪,触发存储
                if (allMonitorsReady)
                {
                    Dictionary<string, string> dictionary = this.UPDATA(storageMonitor);
                    string sql = this.MaskSql(storageMonitor, dictionary, now);
                    this.VirtualMonitor(storageMonitor.SCYJID, now, sql);
                }
            }
        }
    }
}

2.4 两级缓存对比

维度 第一级(ResponseData) 第二级(Comm.Server.data)
位置 HaoPuServer Comm.Server
职责 协议解析后的原始缓存 存储命令组合缓存
数据结构 Dictionary<string, Dictionary<string, string>> Dictionary<string, CacheDATA>
线程安全 lock (responseDataLock) lock (this.data)
触发时机 协议解析完成后 第一级缓存写入后
生命周期 持续缓存,供查询 组合完成后更新

三、存储命令引擎

3.1 核心概念

存储命令(StorageMonitor):把多个监控命令的数据组合后,生成 SQL 存到数据库。

3.2 数据就绪检查

在这里插入图片描述

3.3 时间窗口机制(Period)

// 检查时间差是否在 Period 范围内
if (this.DiffDate(this.data[monitorKey].ResponseTime, cacheDATA.ResponseTime) > storageMonitor.Period)
{
    allMonitorsReady = false;
    break;
}

Period 确保子命令数据在合理时间内返回(单位:分钟),避免脏数据组合(比如上次采集的旧数据 + 本次新数据),超时就等下一轮采集。

private int DiffDate(DateTime d1, DateTime d2)
{
    // 注意:单位是分钟(Minute),不是秒
    return Convert.ToInt32(DateAndTime.DateDiff(DateInterval.Minute, d1, d2, FirstDayOfWeek.Sunday, FirstWeekOfYear.Jan1));
}

3.4 SMPValue 映射机制

SMPValue 定义存储字段的数据来源。

在这里插入图片描述

private Dictionary<string, string> UPDATA(StorageMonitor sm)
{
    Dictionary<string, string> dictionary = new Dictionary<string, string>();
    foreach (SMonitorResponse smResponse in sm.Response)
    {
        string smpValues = "";
        if (smResponse.Smp.Count > 0)
        {
            foreach (SMPValue smpValue in smResponse.Smp)
            {
                // 从缓存中获取对应监控命令的数据
                if (this.data.ContainsKey(smpValue.DeviceId + ";" + smpValue.Monitorid))
                {
                    if (this.data[smpValue.DeviceId + ";" + smpValue.Monitorid].Data.ContainsKey(smpValue.MonitorResponseID))
                    {
                        smpValues = smpValues + this.data[smpValue.DeviceId + ";" + smpValue.Monitorid].Data[smpValue.MonitorResponseID] + ",";
                    }
                    else
                    {
                        // 数据不存在时使用默认值
                        smpValues = smResponse.DefaultValue;
                    }
                }
                else
                {
                    // 缓存不存在时使用默认值
                    smpValues = smResponse.DefaultValue;
                }
            }
            if (smpValues.Length > 0)
            {
                smpValues = smpValues.Substring(0, smpValues.Length - 1);
            }
        }
        else
        {
            // 没有 SMPValue 配置时使用默认值
            smpValues = smResponse.DefaultValue;
        }
        
        dictionary.Add(smResponse.Id, this.OutputValue(smpValues, smResponse.ModifiedOutput));
    }
    return dictionary;
}

四、数据库设计

4.1 12 张核心配置表

表名 作用 关键字段
Device 设备管理 ID、Name、IP、Port
Monitor 监控命令定义 ID、Code、Timeout、RetryTimes
MonitorResponse 响应字段定义 ID、DataLen、Datatype、Repeat
StorageMonitor 存储命令定义 SCYJID、DeviceID、MonitorID、Period
SMonitorResponse 存储字段定义 Id、StorageField、ModifiedOutput
SMPValue 数据来源映射 DeviceId、Monitorid、MonitorResponseID

4.2 配置驱动的优势

  1. 新增监控命令只需在数据库添加配置,不用改代码
  2. 调整拆分策略修改 Monitor 表就行
  3. 不同设备可以配置不同的采集策略
  4. 配置修改后即时生效

五、高并发设计

5.1 ThreadPool 使用

// 协议解析完成后,异步传递给存储命令引擎
CacheDATA state = new CacheDATA(deviceId, monitorID, responsetime, threadData);
ThreadPool.QueueUserWorkItem(new WaitCallback(this.servercomm.CacheData), state);

优势:不阻塞协议解析线程、支持多设备并发处理、自动管理线程池大小。

5.2 任务队列机制

在这里插入图片描述

设计要点:协议解析层多线程并发提高吞吐量,存储命令引擎单线程 + lock 保证数据一致性。


六、完整数据流

在这里插入图片描述


七、方案优势

优势 说明
两级缓存 协议解析缓存 + 存储命令缓存,职责清晰
线程安全 lock 机制保护字典操作,避免并发异常
时间窗口 Period 机制确保数据完整性
配置驱动 12 张表驱动,不用修改代码
高并发 ThreadPool + 任务队列支持多设备
异步处理 不阻塞,提高吞吐量

八、适用场景

本架构用在:工业物联网数据采集、多协议解析平台、配置驱动的数据处理、高并发数据采集。


关键词:系统架构,缓存设计,工业物联网,油井监控,高并发,配置驱动,存储命令

本文基于实际项目经验编写,代码已脱敏处理。如需完整源码或技术咨询,请联系我们。

更多推荐