前言

2025 年 9 月,我所在团队面临一个严峻挑战:智能制造产线的数据采集系统每天需要处理来自 1200 多个传感器的数据,MySQL 数据库已经不堪重负,凌晨 3 点被告警吵醒成了家常便饭。在技术选型会上,我们评估了 InfluxDB、TimescaleDB 等多个方案,最终在技术交流中了解到即将开源的天翼云 OpenTeleDB。2025 年 11 月 1 日,OpenTeleDB 正式宣布开源,我们第一时间在 Gitee 上下载并搭建了测试环境,没想到这次尝试彻底改变了我们的技术架构——经过 18 天的快速迁移实践,系统写入 TPS 提升 7.7 倍,查询响应时间降低 90%,存储成本节省 74%,OpenTeleDB 在时序数据处理领域展现出的技术优势令人印象深刻。本文记录了从 OpenTeleDB 开源首日接触到完成生产环境迁移的完整过程,包括技术选型、环境部署、数据迁移、性能优化等关键环节,分享了迁移过程中遇到的 5 个典型问题及解决方案,文中包含大量可直接使用的代码示例、配置文件、监控脚本和运维工具,适合正在进行数据库选型、面临时序数据存储挑战的技术团队参考,希望这些真实的经验分享能够帮助更多开发者少走弯路,更好地应用 OpenTeleDB 解决实际业务问题。

在这里插入图片描述


声明:本文由作者“白鹿第一帅”于 CSDN 社区原创首发,未经作者本人授权,禁止转载!爬虫、复制至第三方平台属于严重违法行为,侵权必究。亲爱的读者,如果你在第三方平台看到本声明,说明本文内容已被窃取,内容可能残缺不全,强烈建议您移步“白鹿第一帅” CSDN 博客查看原文,并在 CSDN 平台私信联系作者对该第三方违规平台举报反馈,感谢您对于原创和知识产权保护做出的贡献!

文章作者白鹿第一帅作者主页https://blog.csdn.net/qq_22695001,未经授权,严禁转载,侵权必究!

一、初识 OpenTeleDB:为什么选择它?

1.1、项目背景与痛点

我们公司是一家做智能制造解决方案的企业,服务的客户是某大型汽车零部件工厂。产线上部署了 1200 多个传感器,监控注塑机、冲压机、焊接机器人等设备的运行状态。每个传感器每秒上报 1-5 条数据,高峰期每秒写入量达到 4 万条。

真实的痛点场景:2025 年 9 月的一个深夜,我接到运维同事的紧急电话:生产监控大屏卡死了,车间主任急得团团转。登录服务器一看,MySQL 的慢查询日志爆了,一条简单的“查询最近 1 小时设备温度趋势”的 SQL 执行了 8 秒还没返回结果。更糟糕的是,由于写入压力大,我们不得不做了分库分表,按设备 ID 哈希分成了 16 个库。这导致跨设备的统计查询变得异常复杂,需要在应用层做数据聚合。代码维护成本高不说,性能还是上不去。

当时的数据库状况:

  • MySQL 5.7,单主多从架构
  • 主库 16 核 64G 配置,磁盘 IO 使用率常年 80% 以上
  • 数据表按月分区,每月数据量约 800GB
  • 历史数据保留 6 个月,总存储占用 4.5TB
  • 查询响应时间 P95 在 3 秒以上,P99 超过 10 秒

原有 MySQL 架构图

数据库层 - 分库分表
应用层
写入
写入
写入
写入
读取
读取
读取
读取
主从复制
主从复制
主从复制
主从复制
MySQL主库1
DB_00-DB_03
MySQL主库2
DB_04-DB_07
MySQL主库3
DB_08-DB_11
MySQL主库4
DB_12-DB_15
从库1
从库2
从库3
从库4
Spring Boot应用集群

技术团队压力很大,老板也在催着优化。这时候,我开始认真考虑更换数据库方案。

1.2、初次接触 OpenTeleDB 的过程

周日上午:下载与安装

11 月 2 日早上(OpenTeleDB 开源次日),我打开 Gitee 搜索“OpenTeleDB”,找到了官方仓库。

我在自己的笔记本上(Windows 11 + WSL2 Ubuntu 22.04)先做了个简单测试:

# 解压安装包
tar -zxvf openteledb-2.5.1-linux-x64.tar.gz
cd openteledb-2.5.1

# 初始化数据目录
./bin/initdb -D ~/openteledb_data -U admin

# 启动数据库
./bin/pg_ctl -D ~/openteledb_data -l logfile start

整个过程不到 3 分钟,数据库就跑起来了。这比我之前折腾 InfluxDB 集群要简单太多。

周日下午:性能基准测试

我写了个简单的 Python 脚本,模拟真实场景的数据写入:

import psycopg2
import time
import random
from datetime import datetime

conn = psycopg2.connect(
    host='localhost',
    port=5432,
    user='admin',
    password='[password]',
    database='testdb'
)

# 创建测试表
cursor = conn.cursor()
cursor.execute("""
    CREATE TABLE IF NOT EXISTS sensor_test (
        time TIMESTAMPTZ NOT NULL,
        sensor_id INTEGER,
        value DOUBLE PRECISION
    )
""")
cursor.execute("SELECT create_hypertable('sensor_test', 'time')")
conn.commit()

# 批量写入测试
start_time = time.time()
batch_size = 1000
total_records = 100000

for i in range(0, total_records, batch_size):
    batch_data = [
        (datetime.now(), random.randint(1, 1200), random.uniform(20, 100))
        for _ in range(batch_size)
    ]
    cursor.executemany(
        "INSERT INTO sensor_test VALUES (%s, %s, %s)",
        batch_data
    )
    conn.commit()
    
    if (i + batch_size) % 10000 == 0:
        print(f"Inserted {i + batch_size} records")

elapsed = time.time() - start_time
print(f"Total time: {elapsed:.2f}s, TPS: {total_records/elapsed:.0f}")

测试结果让我眼前一亮:

  • 10 万条数据写入耗时:8.3 秒
  • 平均 TPS:12048
  • 我的笔记本只是 i7-10 代 CPU,16G 内存

作为对比,我在同样的环境下测试 MySQL,10 万条数据写入耗时 23.7 秒,TPS 只有 4219。

初步性能对比图

写入性能对比 (10万条数据)
MySQL
23.7秒
4,219 TPS
OpenTeleDB
8.3秒
12,048 TPS
测试项 MySQL OpenTeleDB 性能提升
写入耗时 23.7 秒 8.3 秒 2.9 倍 ⬆️
TPS 4219 12048 2.9 倍 ⬆️
CPU 使用率 85% 45% 降低 47% ⬇️

周日晚上:查询性能测试

写入性能不错,但查询呢?我测试了几个典型场景:

-- 场景1:查询最近1小时的数据,按5分钟聚合
SELECT 
    time_bucket('5 minutes', time) AS bucket,
    sensor_id,
    AVG(value) as avg_value
FROM sensor_test
WHERE time > NOW() - INTERVAL '1 hour'
GROUP BY bucket, sensor_id
ORDER BY bucket DESC;
-- OpenTeleDB: 45ms
-- MySQL: 2.8s

-- 场景2:查询特定传感器最近24小时的最大值
SELECT 
    time_bucket('1 hour', time) AS hour,
    MAX(value) as max_value
FROM sensor_test
WHERE sensor_id = 100 
    AND time > NOW() - INTERVAL '24 hours'
GROUP BY hour;
-- OpenTeleDB: 18ms
-- MySQL: 1.2s

这个周末的测试让我确信:OpenTeleDB 值得深入尝试。

深入了解架构特性

我花了一整天时间研读 OpenTeleDB 的文档和源码,发现了几个关键特性:

  1. 自动分区管理:基于时间的自动分区(chunk),默认按周分区,可以自定义
  2. 列式压缩:对历史数据自动压缩,压缩比可达 10:1
  3. 连续聚合:类似物化视图,但是增量更新,查询性能极高
  4. 数据保留策略:自动删除过期数据,不需要手动写脚本

这些特性正好解决了我们在 MySQL 上遇到的痛点。作为一个写了 11 年技术博客的老兵,我见过太多数据库的兴衰,但 OpenTeleDB 的设计理念让我眼前一亮。周一一上班,我就向技术总监提交了 POC 方案,并在我的 CSDN 博客上记录了初步的测试心得。

二、从 MySQL 到 OpenTeleDB 的迁移实战

2.1、环境准备与安装部署

硬件资源申请(11 月 4 日 - 11 月 5 日)

POC 方案通过后,我向运维部门申请了 3 台服务器:

  • 配置:16 核 CPU、64G 内存、2TB SSD
  • 操作系统:CentOS 7.9
  • 网络:千兆内网互联

生产环境部署(11 月 6 日)

在三台服务器上分别部署 OpenTeleDB,配置一主两从架构。

主节点(192.168.1.101)安装过程:

# 创建数据库用户
useradd -m openteledb
su - openteledb

# 解压安装包
cd /opt
tar -zxvf openteledb-2.5.1-linux-x64.tar.gz
ln -s openteledb-2.5.1 openteledb

# 初始化数据目录
/opt/openteledb/bin/initdb -D /data/openteledb/main \
    -U postgres \
    --encoding=UTF8 \
    --locale=zh_CN.UTF-8

# 修改配置文件
vi /data/openteledb/main/postgresql.conf

关键配置参数(这些是我根据实际负载调整的):

# 连接配置
listen_addresses = '*'
port = 5432
max_connections = 500

# 内存配置
shared_buffers = 16GB              # 物理内存的25%
effective_cache_size = 48GB        # 物理内存的75%
work_mem = 64MB
maintenance_work_mem = 2GB

# WAL配置
wal_level = replica
max_wal_size = 4GB
min_wal_size = 1GB
wal_buffers = 16MB

# 检查点配置
checkpoint_timeout = 15min
checkpoint_completion_target = 0.9

# 时序数据库特定配置
timescaledb.max_background_workers = 8

配置 pg_hba.conf 允许从节点连接:

# TYPE  DATABASE        USER            ADDRESS                 METHOD
local   all             all                                     trust
host    all             all             127.0.0.1/32            md5
host    all             all             192.168.1.0/24          md5
host    replication     replicator      192.168.1.0/24          md5

启动主节点:

/opt/openteledb/bin/pg_ctl -D /data/openteledb/main start

从节点配置(192.168.1.102, 192.168.1.103)

在主节点创建复制用户:

CREATE USER replicator WITH REPLICATION PASSWORD 'Rep1ic@t0r2024';

在从节点使用 pg_basebackup 进行基础备份:

# 从节点1
/opt/openteledb/bin/pg_basebackup \
    -h 192.168.1.101 \
    -U replicator \
    -D /data/openteledb/standby \
    -X stream \
    -P

# 创建standby.signal文件
touch /data/openteledb/standby/standby.signal

# 配置主节点连接信息
echo "primary_conninfo = 'host=192.168.1.101 port=5432 user=replicator password=Rep1ic@t0r2024'" \
    >> /data/openteledb/standby/postgresql.auto.conf

# 启动从节点
/opt/openteledb/bin/pg_ctl -D /data/openteledb/standby start

验证主从复制状态:

-- 在主节点执行
SELECT client_addr, state, sync_state 
FROM pg_stat_replication;

-- 输出:
--  client_addr   |   state   | sync_state 
-- ---------------+-----------+------------
--  192.168.1.102 | streaming | async
--  192.168.1.103 | streaming | async

部署过程中遇到的第一个坑:从节点启动后一直报错“could not connect to the primary server”。排查了半天发现是防火墙没开放 5432 端口:

firewall-cmd --permanent --add-port=5432/tcp
firewall-cmd --reload

整个集群部署完成用了大约 4 个小时,比预期的一天要快很多。

OpenTeleDB 集群架构图

监控层
OpenTeleDB集群
负载均衡层
应用层
写入
读取
流复制
流复制
metrics
metrics
metrics
数据源
Prometheus
Grafana
主节点
192.168.1.101
16C/64G/2TB SSD
从节点1
192.168.1.102
16C/64G/2TB SSD
从节点2
192.168.1.103
16C/64G/2TB SSD
PgBouncer连接池
Spring Boot应用集群

2.2、数据模型设计

在 OpenTeleDB 中,时序数据的建模方式与传统关系型数据库有所不同。我们采用了超表(Hypertable)的设计:

-- 创建传感器数据表
CREATE TABLE sensor_data (
    time        TIMESTAMPTZ NOT NULL,
    sensor_id   INTEGER NOT NULL,
    device_id   VARCHAR(50) NOT NULL,
    metric_type VARCHAR(20) NOT NULL,
    value       DOUBLE PRECISION,
    quality     INTEGER,
    location    VARCHAR(100)
);

-- 创建超表,按时间自动分区
SELECT create_hypertable('sensor_data', 'time', chunk_time_interval => INTERVAL '1 day');

-- 创建索引优化查询
CREATE INDEX idx_sensor_device ON sensor_data (sensor_id, device_id, time DESC);
CREATE INDEX idx_metric_time ON sensor_data (metric_type, time DESC);

这种设计的巧妙之处在于:

  • 自动按天分区,无需手动维护分区表
  • 旧数据可以自动压缩和归档
  • 查询时自动路由到相关分区,性能大幅提升

超表(Hypertable)分区示意图

数据生命周期
自动分区 Chunks
sensor_data 超表
热数据
7天内
未压缩
温数据
7-30天
已压缩
冷数据
30-90天
已压缩
过期数据
>90天
自动删除
Chunk 1
2024-11-01
Chunk 2
2024-11-02
Chunk 3
2024-11-03
Chunk 4
2024-11-04
Chunk 5
2024-11-05
...
Hypertable
逻辑表

2.3、数据迁移策略

我们采用了分阶段迁移的策略,确保业务平稳过渡:

迁移流程图

48小时
通过
失败
7天观察
通过
失败
24小时
48小时
7天观察
通过
失败
开始迁移
阶段1: 历史数据迁移
数据校验
阶段2: 双写验证
修复问题
双写一致性检查
阶段3: 灰度切换读流量
修复问题
10%流量
50%流量
100%流量
性能稳定性检查
阶段4: 下线MySQL
回滚到MySQL
迁移完成

迁移时间线

日期 阶段 主要工作 状态
11 月 1 日 OpenTeleDB 开源 官方宣布开源,下载测试 ✅ 完成
11 月 2-3 日 POC 测试 性能测试,方案验证 ✅ 完成
11 月 4-5 日 环境准备 申请服务器,搭建集群 ✅ 完成
11 月 6 日 环境部署 部署 OpenTeleDB 集群 ✅ 完成
11 月 7-8 日 历史数据迁移 迁移 6 个月历史数据 ✅ 完成
11 月 9 日 问题修复 解决时区问题,重新迁移 ✅ 完成
11 月 10-11 日 性能优化 优化迁移脚本,使用 COPY 命令 ✅ 完成
11 月 12 日 双写上线 应用层改造,实现双写 ✅ 完成
11 月 13-14 日 双写验证 数据一致性校验,索引优化 ✅ 完成
11 月 15 日 灰度切换 10% 读流量切换到 OpenTeleDB ✅ 完成
11 月 16 日 灰度扩大 50% 读流量切换 ✅ 完成
11 月 17 日 全量切换 100% 读流量切换 ✅ 完成
11 月 18 日 稳定性观察 监控性能指标,调优 ✅ 完成
11 月 19 日 迁移完成 停止双写,撰写总结 ✅ 进行中

第一阶段:历史数据迁移

使用自定义脚本进行批量数据导出导入:

import psycopg2
import pymysql
from datetime import datetime, timedelta

# MySQL连接
mysql_conn = pymysql.connect(host='[mysql_host]', user='[user]', password='[pwd]', database='iot_db')
# OpenTeleDB连接
pg_conn = psycopg2.connect(host='[openteledb_host]', user='[user]', password='[pwd]', database='iot_db')

def migrate_data_batch(start_date, end_date):
    mysql_cursor = mysql_conn.cursor()
    pg_cursor = pg_conn.cursor()
    
    # 从MySQL读取数据
    query = f"""
        SELECT timestamp, sensor_id, device_id, metric_type, value, quality, location
        FROM sensor_readings
        WHERE timestamp >= '{start_date}' AND timestamp < '{end_date}'
    """
    mysql_cursor.execute(query)
    
    # 批量插入OpenTeleDB
    batch_size = 10000
    batch_data = []
    
    for row in mysql_cursor:
        batch_data.append(row)
        if len(batch_data) >= batch_size:
            pg_cursor.executemany(
                "INSERT INTO sensor_data VALUES (%s, %s, %s, %s, %s, %s, %s)",
                batch_data
            )
            pg_conn.commit()
            batch_data = []
    
    # 插入剩余数据
    if batch_data:
        pg_cursor.executemany(
            "INSERT INTO sensor_data VALUES (%s, %s, %s, %s, %s, %s, %s)",
            batch_data
        )
        pg_conn.commit()

# 按月迁移历史数据
start = datetime(2024, 1, 1)
while start < datetime.now():
    end = start + timedelta(days=30)
    print(f"Migrating data from {start} to {end}")
    migrate_data_batch(start, end)
    start = end

历史数据迁移耗时约 48 小时,期间 MySQL 继续提供服务,互不影响。

第二阶段:双写验证

在应用层实现双写逻辑,同时写入 MySQL 和 OpenTeleDB,持续观察一周:

@Service
public class SensorDataService {
    
    @Autowired
    private MySQLRepository mysqlRepo;
    
    @Autowired
    private OpenTeleDBRepository openTeleDBRepo;
    
    public void saveSensorData(SensorData data) {
        // 主写MySQL
        mysqlRepo.save(data);
        
        // 异步写OpenTeleDB
        CompletableFuture.runAsync(() -> {
            try {
                openTeleDBRepo.save(data);
            } catch (Exception e) {
                log.error("Failed to write to OpenTeleDB", e);
                // 记录失败数据,后续补偿
            }
        });
    }
}

第三阶段:流量切换

验证无误后,逐步将读流量切换到 OpenTeleDB:

@Service
public class SensorQueryService {
    
    @Autowired
    private MySQLRepository mysqlRepo;
    
    @Autowired
    private OpenTeleDBRepository openTeleDBRepo;
    
    @Value("${database.read.source}")
    private String readSource; // mysql or openteledb
    
    @Value("${database.read.ratio}")
    private int readRatio; // 0-100,OpenTeleDB读取比例
    
    public List<SensorData> queryByTimeRange(String deviceId, Date start, Date end) {
        // 根据配置的比例决定从哪个数据库读取
        if ("openteledb".equals(readSource)) {
            return openTeleDBRepo.findByDeviceIdAndTimeBetween(deviceId, start, end);
        } else if ("mysql".equals(readSource)) {
            return mysqlRepo.findByDeviceIdAndTimeBetween(deviceId, start, end);
        } else if ("gray".equals(readSource)) {
            // 灰度模式:根据比例随机选择
            int random = ThreadLocalRandom.current().nextInt(100);
            if (random < readRatio) {
                return openTeleDBRepo.findByDeviceIdAndTimeBetween(deviceId, start, end);
            } else {
                return mysqlRepo.findByDeviceIdAndTimeBetween(deviceId, start, end);
            }
        }
        return mysqlRepo.findByDeviceIdAndTimeBetween(deviceId, start, end);
    }
    
    // 聚合查询示例
    public Map<String, Object> getDeviceStatistics(String deviceId, Date start, Date end) {
        if ("openteledb".equals(readSource) || isGrayHit()) {
            return queryFromOpenTeleDB(deviceId, start, end);
        } else {
            return queryFromMySQL(deviceId, start, end);
        }
    }
    
    private Map<String, Object> queryFromOpenTeleDB(String deviceId, Date start, Date end) {
        String sql = """
            SELECT 
                time_bucket('5 minutes', time) AS bucket,
                AVG(value) as avg_value,
                MAX(value) as max_value,
                MIN(value) as min_value,
                COUNT(*) as count
            FROM sensor_data
            WHERE device_id = ? 
                AND time BETWEEN ? AND ?
            GROUP BY bucket
            ORDER BY bucket DESC
        """;
        
        return openTeleDBRepo.executeQuery(sql, deviceId, start, end);
    }
    
    private Map<String, Object> queryFromMySQL(String deviceId, Date start, Date end) {
        // MySQL的查询逻辑
        String sql = """
            SELECT 
                DATE_FORMAT(timestamp, '%Y-%m-%d %H:%i:00') AS bucket,
                AVG(value) as avg_value,
                MAX(value) as max_value,
                MIN(value) as min_value,
                COUNT(*) as count
            FROM sensor_readings
            WHERE device_id = ? 
                AND timestamp BETWEEN ? AND ?
            GROUP BY bucket
            ORDER BY bucket DESC
        """;
        
        return mysqlRepo.executeQuery(sql, deviceId, start, end);
    }
    
    private boolean isGrayHit() {
        if ("gray".equals(readSource)) {
            return ThreadLocalRandom.current().nextInt(100) < readRatio;
        }
        return false;
    }
}

配置文件示例(application.yml):

database:
  read:
    source: gray  # mysql, openteledb, gray
    ratio: 10     # 灰度比例,10表示10%流量到OpenTeleDB
  
  mysql:
    url: jdbc:mysql://192.168.1.50:3306/iot_db
    username: app_user
    password: ${MYSQL_PASSWORD}
    hikari:
      maximum-pool-size: 50
      minimum-idle: 10
      connection-timeout: 30000
      
  openteledb:
    url: jdbc:postgresql://192.168.1.101:5432/iot_db
    username: app_user
    password: ${OPENTELEDB_PASSWORD}
    hikari:
      maximum-pool-size: 50
      minimum-idle: 10
      connection-timeout: 30000

通过配置中心动态调整读流量比例,最终实现 100% 切换。我们使用 Apollo 配置中心,可以实时调整参数而无需重启应用:

@Component
public class DatabaseConfigRefreshListener {
    
    @Autowired
    private SensorQueryService sensorQueryService;
    
    @ApolloConfigChangeListener
    public void onChange(ConfigChangeEvent changeEvent) {
        if (changeEvent.isChanged("database.read.source") || 
            changeEvent.isChanged("database.read.ratio")) {
            
            String newSource = changeEvent.getChange("database.read.source").getNewValue();
            String newRatio = changeEvent.getChange("database.read.ratio").getNewValue();
            
            log.info("Database config changed: source={}, ratio={}", newSource, newRatio);
            
            // 刷新配置
            sensorQueryService.refreshConfig(newSource, Integer.parseInt(newRatio));
        }
    }
}

2.4、迁移过程中的坑与解决方案

问题 1:时区处理差异(11 月 9 日)

历史数据迁移到第 3 天,运维同事反馈:迁移后的数据时间对不上,差了 8 个小时!

原因是 MySQL 中我们用的 DATETIME 类型,不带时区信息,默认存的是北京时间。而 OpenTeleDB 的 TIMESTAMPTZ 是带时区的,直接插入会被当作 UTC 时间。

解决方案:在迁移脚本中显式指定时区

# 修改前的代码
for row in mysql_cursor:
    pg_cursor.execute(
        "INSERT INTO sensor_data VALUES (%s, %s, %s, %s, %s, %s, %s)",
        row
    )

# 修改后的代码
for row in mysql_cursor:
    # 将MySQL的datetime转换为带时区的timestamp
    timestamp_with_tz = row[0].replace(tzinfo=pytz.timezone('Asia/Shanghai'))
    pg_cursor.execute(
        "INSERT INTO sensor_data VALUES (%s, %s, %s, %s, %s, %s, %s)",
        (timestamp_with_tz, row[1], row[2], row[3], row[4], row[5], row[6])
    )

这个问题导致我们重新迁移了 3 天的数据,损失了大半天时间。教训:迁移前一定要仔细验证时间字段!

问题 2:批量插入性能优化(11 月 10 日)

最初的迁移脚本使用 executemany() 方法,迁移 1 亿条数据预计需要 72 小时。这个速度完全不能接受。

我尝试了几种优化方案:

方案 1:增大 batch_size(从 1000 改到 10000)

  • 效果:性能提升 30%,但还是太慢

方案 2:使用 COPY 命令

  • 效果:性能提升 10 倍!

最终的优化代码:

from io import StringIO
import csv

def bulk_insert_optimized(data_list):
    # 使用StringIO作为内存缓冲区
    buffer = StringIO()
    writer = csv.writer(buffer, delimiter='\t')
    
    for row in data_list:
        # 处理时区
        timestamp_with_tz = row[0].replace(tzinfo=pytz.timezone('Asia/Shanghai'))
        writer.writerow([
            timestamp_with_tz.isoformat(),
            row[1], row[2], row[3], row[4], row[5], row[6]
        ])
    
    buffer.seek(0)
    
    # 使用COPY命令批量导入
    cursor.copy_expert(
        "COPY sensor_data FROM STDIN WITH (FORMAT CSV, DELIMITER E'\\t')",
        buffer
    )
    conn.commit()

# 实测性能对比
# executemany: 1000条/秒
# COPY: 12000条/秒

优化后,1 亿条数据迁移时间从 72 小时缩短到 7 小时。

迁移性能优化对比

优化后
优化中
优化前
优化1
优化2
COPY命令
12000条/秒
实际7小时
增大batch_size
1300条/秒
预计55小时
executemany方法
1000条/秒
预计72小时

问题 3:主键冲突导致迁移中断(11 月 11 日凌晨)

凌晨 2 点,迁移脚本报错停止了:

ERROR: duplicate key value violates unique constraint "sensor_data_pkey"

排查发现,MySQL 中有少量重复数据(相同的时间戳 +sensor_id),但因为没有设置唯一约束,所以能插入成功。而 OpenTeleDB 中我们设置了主键约束。

解决方案:在迁移前先清洗数据

-- 在MySQL中找出重复数据
SELECT timestamp, sensor_id, COUNT(*) as cnt
FROM sensor_readings
GROUP BY timestamp, sensor_id
HAVING COUNT(*) > 1;

-- 发现有237条重复数据,手动清理
DELETE t1 FROM sensor_readings t1
INNER JOIN sensor_readings t2 
WHERE t1.id > t2.id 
    AND t1.timestamp = t2.timestamp 
    AND t1.sensor_id = t2.sensor_id;

问题 4:连接池耗尽(11 月 12 日)

双写阶段,应用服务器频繁报错:

FATAL: sorry, too many clients already

原因是应用连接了两个数据库,连接数翻倍,超过了 OpenTeleDB 的 max_connections 限制(默认 100)。

解决方案:

  1. 调整 OpenTeleDB 的 max_connections 参数到 500
  2. 应用层使用连接池,设置合理的最大连接数
// HikariCP连接池配置
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://192.168.1.101:5432/iot_db");
config.setUsername("app_user");
config.setPassword("[password]");
config.setMaximumPoolSize(50);  // 原来是100
config.setMinimumIdle(10);
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);

问题 5:查询性能不如预期(11 月 14 日)

切换读流量到 OpenTeleDB 后,发现某些查询反而比 MySQL 慢。仔细分析发现是缺少合适的索引。

OpenTeleDB 的索引策略和 MySQL 不同,需要根据时序数据的特点来设计:

-- 错误的索引(和MySQL一样)
CREATE INDEX idx_sensor ON sensor_data(sensor_id);

-- 正确的索引(时间列必须在后面)
CREATE INDEX idx_sensor_time ON sensor_data(sensor_id, time DESC);
CREATE INDEX idx_device_time ON sensor_data(device_id, time DESC);

-- 对于高基数的标签列,使用BRIN索引
CREATE INDEX idx_time_brin ON sensor_data USING BRIN(time);

调整索引后,查询性能提升了 3-5 倍。

这些坑让我深刻体会到:迁移不是简单的数据搬运,需要理解目标数据库的特性,才能发挥出最佳性能。

三、生产环境实战效果

3.1、性能对比数据(11 月 17 日 - 11 月 19 日)

11 月 17 日,我们完成了 100% 流量切换。经过 2 天的生产环境稳定运行,我使用 Prometheus + Grafana 收集了详细的性能指标。

写入性能对比

测试场景:模拟真实负载,1200 个传感器,每秒 4 万条数据写入

指标 MySQL OpenTeleDB 提升幅度
写入 TPS 12340 94780 7.7 倍
写入延迟 P50 23ms 3ms 7.7 倍
写入延迟 P95 67ms 8ms 8.4 倍
写入延迟 P99 89ms 13ms 6.8 倍
CPU 使用率 78% 42% 降低 46%
磁盘 IO 等待 35% 8% 降低 77%

查询性能对比

测试了 10 个最常用的查询场景,记录响应时间:

查询场景 MySQL OpenTeleDB 提升幅度
最近 1 小时数据,5 分钟聚合 2.8s 180ms 15.6 倍
最近 24 小时趋势,1 小时聚合 5.7s 320ms 17.8 倍
单设备历史数据查询 1.2s 85ms 14.1 倍
多设备对比查询 8.3s 450ms 18.4 倍
异常数据检测(窗口函数) 12.5s 680ms 18.4 倍

存储成本对比

这是让老板最满意的部分:

项目 MySQL OpenTeleDB 节省
6 个月原始数据 4.5TB 1.2TB 73%
1 年聚合数据 800GB 180GB 77%
总存储占用 5.3TB 1.38TB 74%
月存储成本 ¥8500 ¥2200 ¥6300

OpenTeleDB 的压缩算法真的很强大,特别是对于数值型的时序数据。我们测试了压缩前后的对比:

-- 查看表的压缩状态
SELECT 
    chunk_name,
    before_compression_total_bytes,
    after_compression_total_bytes,
    compression_ratio
FROM chunk_compression_stats('sensor_data')
ORDER BY chunk_name DESC
LIMIT 10;

-- 典型输出:
-- chunk_name              | before | after  | ratio
-- ------------------------|--------|--------|-------
-- _hyper_1_234_chunk      | 2.1GB  | 180MB  | 11.7x
-- _hyper_1_233_chunk      | 2.0GB  | 175MB  | 11.4x

平均压缩比达到了 11:1,远超预期。

压缩效果详细数据

数据类型 压缩前 压缩后 压缩比 节省空间
温度数据 850GB 72GB 11.8:1 91.5%
压力数据 920GB 78GB 11.8:1 91.5%
电流数据 780GB 68GB 11.5:1 91.3%
湿度数据 650GB 58GB 11.2:1 91.1%
其他指标 1300GB 124GB 10.5:1 90.5%
总计 4500GB 400GB 11.3:1 91.1%

3.2、典型应用场景实践

场景 1:实时监控大屏

需要展示最近 1 小时内所有设备的平均温度曲线,按 5 分钟粒度聚合:

SELECT 
    time_bucket('5 minutes', time) AS bucket,
    device_id,
    AVG(value) as avg_temperature
FROM sensor_data
WHERE 
    metric_type = 'temperature'
    AND time > NOW() - INTERVAL '1 hour'
GROUP BY bucket, device_id
ORDER BY bucket DESC;

在 MySQL 中这个查询需要 3-5 秒,而在 OpenTeleDB 中只需 200ms 左右,用户体验提升明显。

场景 2:异常检测与告警

使用窗口函数检测温度异常波动:

-- 基于移动平均和标准差的异常检测
WITH sensor_stats AS (
    SELECT 
        time,
        device_id,
        sensor_id,
        value,
        AVG(value) OVER (
            PARTITION BY device_id, sensor_id
            ORDER BY time 
            ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
        ) as moving_avg,
        STDDEV(value) OVER (
            PARTITION BY device_id, sensor_id
            ORDER BY time 
            ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
        ) as moving_stddev
    FROM sensor_data
    WHERE metric_type = 'temperature'
        AND time > NOW() - INTERVAL '10 minutes'
)
SELECT 
    time,
    device_id,
    sensor_id,
    value,
    moving_avg,
    moving_stddev,
    ABS(value - moving_avg) as deviation,
    CASE 
        WHEN ABS(value - moving_avg) > 3 * moving_stddev THEN 'CRITICAL'
        WHEN ABS(value - moving_avg) > 2 * moving_stddev THEN 'WARNING'
        ELSE 'NORMAL'
    END as alert_level
FROM sensor_stats
WHERE ABS(value - moving_avg) > 2 * moving_stddev
ORDER BY time DESC;

这种复杂的时序分析在 MySQL 中几乎无法实现,而 OpenTeleDB 原生支持,性能优异。对应的 Java 代码实现:

@Service
public class AnomalyDetectionService {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    @Autowired
    private AlertService alertService;
    
    /**
     * 检测温度异常
     */
    public List<AnomalyAlert> detectTemperatureAnomaly() {
        String sql = """
            WITH sensor_stats AS (
                SELECT 
                    time,
                    device_id,
                    sensor_id,
                    value,
                    AVG(value) OVER (
                        PARTITION BY device_id, sensor_id
                        ORDER BY time 
                        ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
                    ) as moving_avg,
                    STDDEV(value) OVER (
                        PARTITION BY device_id, sensor_id
                        ORDER BY time 
                        ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
                    ) as moving_stddev
                FROM sensor_data
                WHERE metric_type = 'temperature'
                    AND time > NOW() - INTERVAL '10 minutes'
            )
            SELECT 
                time,
                device_id,
                sensor_id,
                value,
                moving_avg,
                moving_stddev,
                ABS(value - moving_avg) as deviation,
                CASE 
                    WHEN ABS(value - moving_avg) > 3 * moving_stddev THEN 'CRITICAL'
                    WHEN ABS(value - moving_avg) > 2 * moving_stddev THEN 'WARNING'
                    ELSE 'NORMAL'
                END as alert_level
            FROM sensor_stats
            WHERE ABS(value - moving_avg) > 2 * moving_stddev
            ORDER BY time DESC
        """;
        
        List<AnomalyAlert> alerts = jdbcTemplate.query(sql, (rs, rowNum) -> {
            AnomalyAlert alert = new AnomalyAlert();
            alert.setTime(rs.getTimestamp("time"));
            alert.setDeviceId(rs.getString("device_id"));
            alert.setSensorId(rs.getInt("sensor_id"));
            alert.setValue(rs.getDouble("value"));
            alert.setMovingAvg(rs.getDouble("moving_avg"));
            alert.setMovingStddev(rs.getDouble("moving_stddev"));
            alert.setDeviation(rs.getDouble("deviation"));
            alert.setAlertLevel(rs.getString("alert_level"));
            return alert;
        });
        
        // 发送告警
        alerts.forEach(alert -> {
            if ("CRITICAL".equals(alert.getAlertLevel())) {
                alertService.sendCriticalAlert(alert);
            } else if ("WARNING".equals(alert.getAlertLevel())) {
                alertService.sendWarningAlert(alert);
            }
        });
        
        return alerts;
    }
    
    /**
     * 定时任务:每分钟执行一次异常检测
     */
    @Scheduled(cron = "0 * * * * ?")
    public void scheduleAnomalyDetection() {
        try {
            List<AnomalyAlert> alerts = detectTemperatureAnomaly();
            log.info("Anomaly detection completed, found {} alerts", alerts.size());
        } catch (Exception e) {
            log.error("Anomaly detection failed", e);
        }
    }
}

@Data
public class AnomalyAlert {
    private Timestamp time;
    private String deviceId;
    private Integer sensorId;
    private Double value;
    private Double movingAvg;
    private Double movingStddev;
    private Double deviation;
    private String alertLevel;
}

场景 3:历史数据分析

分析过去一年的设备运行趋势,按天降采样:

-- 设备能耗分析:按天聚合
SELECT 
    time_bucket('1 day', time) AS day,
    device_id,
    AVG(value) as daily_avg,
    MAX(value) as daily_max,
    MIN(value) as daily_min,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) as p95,
    PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY value) as p50,
    COUNT(*) as sample_count
FROM sensor_data
WHERE 
    metric_type = 'power_consumption'
    AND time > NOW() - INTERVAL '1 year'
GROUP BY day, device_id
ORDER BY day;

-- 设备健康度评分
WITH device_metrics AS (
    SELECT 
        device_id,
        time_bucket('1 hour', time) AS hour,
        AVG(CASE WHEN metric_type = 'temperature' THEN value END) as avg_temp,
        AVG(CASE WHEN metric_type = 'vibration' THEN value END) as avg_vibration,
        AVG(CASE WHEN metric_type = 'power_consumption' THEN value END) as avg_power,
        COUNT(*) as data_points
    FROM sensor_data
    WHERE time > NOW() - INTERVAL '7 days'
    GROUP BY device_id, hour
),
health_scores AS (
    SELECT 
        device_id,
        hour,
        -- 温度评分(正常范围20-80度)
        CASE 
            WHEN avg_temp BETWEEN 20 AND 80 THEN 100
            WHEN avg_temp BETWEEN 15 AND 90 THEN 80
            WHEN avg_temp BETWEEN 10 AND 95 THEN 60
            ELSE 40
        END as temp_score,
        -- 振动评分(正常范围0-50)
        CASE 
            WHEN avg_vibration BETWEEN 0 AND 50 THEN 100
            WHEN avg_vibration BETWEEN 50 AND 80 THEN 80
            WHEN avg_vibration BETWEEN 80 AND 100 THEN 60
            ELSE 40
        END as vibration_score,
        -- 能耗评分(正常范围0-1000W)
        CASE 
            WHEN avg_power BETWEEN 0 AND 1000 THEN 100
            WHEN avg_power BETWEEN 1000 AND 1500 THEN 80
            WHEN avg_power BETWEEN 1500 AND 2000 THEN 60
            ELSE 40
        END as power_score
    FROM device_metrics
)
SELECT 
    device_id,
    AVG((temp_score + vibration_score + power_score) / 3.0) as health_score,
    CASE 
        WHEN AVG((temp_score + vibration_score + power_score) / 3.0) >= 90 THEN '优秀'
        WHEN AVG((temp_score + vibration_score + power_score) / 3.0) >= 80 THEN '良好'
        WHEN AVG((temp_score + vibration_score + power_score) / 3.0) >= 60 THEN '一般'
        ELSE '需要维护'
    END as health_status
FROM health_scores
GROUP BY device_id
ORDER BY health_score DESC;

查询一年的数据,OpenTeleDB 在 2 秒内返回结果,而 MySQL 需要超过 30 秒。对应的数据分析服务:

@Service
public class DataAnalysisService {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    /**
     * 获取设备能耗趋势
     */
    public List<PowerConsumptionTrend> getPowerConsumptionTrend(
            String deviceId, Date startDate, Date endDate) {
        
        String sql = """
            SELECT 
                time_bucket('1 day', time) AS day,
                device_id,
                AVG(value) as daily_avg,
                MAX(value) as daily_max,
                MIN(value) as daily_min,
                PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) as p95,
                PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY value) as p50,
                COUNT(*) as sample_count
            FROM sensor_data
            WHERE 
                metric_type = 'power_consumption'
                AND device_id = ?
                AND time BETWEEN ? AND ?
            GROUP BY day, device_id
            ORDER BY day
        """;
        
        return jdbcTemplate.query(sql, 
            new Object[]{deviceId, startDate, endDate},
            (rs, rowNum) -> {
                PowerConsumptionTrend trend = new PowerConsumptionTrend();
                trend.setDay(rs.getDate("day"));
                trend.setDeviceId(rs.getString("device_id"));
                trend.setDailyAvg(rs.getDouble("daily_avg"));
                trend.setDailyMax(rs.getDouble("daily_max"));
                trend.setDailyMin(rs.getDouble("daily_min"));
                trend.setP95(rs.getDouble("p95"));
                trend.setP50(rs.getDouble("p50"));
                trend.setSampleCount(rs.getInt("sample_count"));
                return trend;
            });
    }
    
    /**
     * 计算设备健康度评分
     */
    public List<DeviceHealthScore> calculateDeviceHealthScores() {
        String sql = """
            WITH device_metrics AS (
                SELECT 
                    device_id,
                    time_bucket('1 hour', time) AS hour,
                    AVG(CASE WHEN metric_type = 'temperature' THEN value END) as avg_temp,
                    AVG(CASE WHEN metric_type = 'vibration' THEN value END) as avg_vibration,
                    AVG(CASE WHEN metric_type = 'power_consumption' THEN value END) as avg_power,
                    COUNT(*) as data_points
                FROM sensor_data
                WHERE time > NOW() - INTERVAL '7 days'
                GROUP BY device_id, hour
            ),
            health_scores AS (
                SELECT 
                    device_id,
                    hour,
                    CASE 
                        WHEN avg_temp BETWEEN 20 AND 80 THEN 100
                        WHEN avg_temp BETWEEN 15 AND 90 THEN 80
                        WHEN avg_temp BETWEEN 10 AND 95 THEN 60
                        ELSE 40
                    END as temp_score,
                    CASE 
                        WHEN avg_vibration BETWEEN 0 AND 50 THEN 100
                        WHEN avg_vibration BETWEEN 50 AND 80 THEN 80
                        WHEN avg_vibration BETWEEN 80 AND 100 THEN 60
                        ELSE 40
                    END as vibration_score,
                    CASE 
                        WHEN avg_power BETWEEN 0 AND 1000 THEN 100
                        WHEN avg_power BETWEEN 1000 AND 1500 THEN 80
                        WHEN avg_power BETWEEN 1500 AND 2000 THEN 60
                        ELSE 40
                    END as power_score
                FROM device_metrics
            )
            SELECT 
                device_id,
                AVG((temp_score + vibration_score + power_score) / 3.0) as health_score,
                CASE 
                    WHEN AVG((temp_score + vibration_score + power_score) / 3.0) >= 90 THEN '优秀'
                    WHEN AVG((temp_score + vibration_score + power_score) / 3.0) >= 80 THEN '良好'
                    WHEN AVG((temp_score + vibration_score + power_score) / 3.0) >= 60 THEN '一般'
                    ELSE '需要维护'
                END as health_status
            FROM health_scores
            GROUP BY device_id
            ORDER BY health_score DESC
        """;
        
        return jdbcTemplate.query(sql, (rs, rowNum) -> {
            DeviceHealthScore score = new DeviceHealthScore();
            score.setDeviceId(rs.getString("device_id"));
            score.setHealthScore(rs.getDouble("health_score"));
            score.setHealthStatus(rs.getString("health_status"));
            return score;
        });
    }
    
    /**
     * 导出分析报表
     */
    public void exportAnalysisReport(String deviceId, Date startDate, Date endDate) 
            throws IOException {
        
        List<PowerConsumptionTrend> trends = getPowerConsumptionTrend(deviceId, startDate, endDate);
        
        // 使用Apache POI导出Excel
        Workbook workbook = new XSSFWorkbook();
        Sheet sheet = workbook.createSheet("能耗分析报表");
        
        // 创建表头
        Row headerRow = sheet.createRow(0);
        headerRow.createCell(0).setCellValue("日期");
        headerRow.createCell(1).setCellValue("设备ID");
        headerRow.createCell(2).setCellValue("平均能耗");
        headerRow.createCell(3).setCellValue("最大能耗");
        headerRow.createCell(4).setCellValue("最小能耗");
        headerRow.createCell(5).setCellValue("P95");
        headerRow.createCell(6).setCellValue("P50");
        headerRow.createCell(7).setCellValue("采样数");
        
        // 填充数据
        int rowNum = 1;
        for (PowerConsumptionTrend trend : trends) {
            Row row = sheet.createRow(rowNum++);
            row.createCell(0).setCellValue(trend.getDay().toString());
            row.createCell(1).setCellValue(trend.getDeviceId());
            row.createCell(2).setCellValue(trend.getDailyAvg());
            row.createCell(3).setCellValue(trend.getDailyMax());
            row.createCell(4).setCellValue(trend.getDailyMin());
            row.createCell(5).setCellValue(trend.getP95());
            row.createCell(6).setCellValue(trend.getP50());
            row.createCell(7).setCellValue(trend.getSampleCount());
        }
        
        // 保存文件
        String fileName = String.format("power_consumption_report_%s_%s.xlsx", 
            deviceId, new SimpleDateFormat("yyyyMMdd").format(new Date()));
        FileOutputStream fileOut = new FileOutputStream(fileName);
        workbook.write(fileOut);
        fileOut.close();
        workbook.close();
        
        log.info("Analysis report exported: {}", fileName);
    }
}

@Data
public class PowerConsumptionTrend {
    private Date day;
    private String deviceId;
    private Double dailyAvg;
    private Double dailyMax;
    private Double dailyMin;
    private Double p95;
    private Double p50;
    private Integer sampleCount;
}

@Data
public class DeviceHealthScore {
    private String deviceId;
    private Double healthScore;
    private String healthStatus;
}

典型查询场景性能对比

OpenTeleDB
MySQL
查询响应时间对比 (秒)
0.18s
0.32s
0.085s
0.45s
0.68s
2.8s
5.7s
1.2s
8.3s
12.5s
实时监控
5分钟聚合
24小时趋势
1小时聚合
单设备
历史查询
多设备
对比查询
异常检测
窗口函数

3.3、运维体验提升

自动数据保留策略

设置自动删除 90 天前的原始数据,保留聚合数据:

-- 创建数据保留策略
SELECT add_retention_policy('sensor_data', INTERVAL '90 days');

-- 创建连续聚合视图,永久保留
CREATE MATERIALIZED VIEW sensor_data_hourly
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('1 hour', time) AS hour,
    sensor_id,
    device_id,
    metric_type,
    AVG(value) as avg_value,
    MAX(value) as max_value,
    MIN(value) as min_value
FROM sensor_data
GROUP BY hour, sensor_id, device_id, metric_type;

-- 设置自动刷新
SELECT add_continuous_aggregate_policy('sensor_data_hourly',
    start_offset => INTERVAL '2 hours',
    end_offset => INTERVAL '1 hour',
    schedule_interval => INTERVAL '1 hour');

这些功能在 MySQL 中需要编写复杂的定时任务和存储过程,而 OpenTeleDB 原生支持,大大降低了运维复杂度。

压缩与存储优化

启用自动压缩后,存储成本进一步降低:

-- 对历史数据启用压缩
ALTER TABLE sensor_data SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'device_id',
    timescaledb.compress_orderby = 'time DESC'
);

-- 自动压缩7天前的数据
SELECT add_compression_policy('sensor_data', INTERVAL '7 days');

压缩后的数据查询性能不降反升,因为需要扫描的数据量大幅减少。

数据生命周期管理流程

聚合数据
过期数据 (>90天)
冷数据 (30-90天)
温数据 (7-30天)
热数据 (0-7天)
数据写入
7天后
30天后
90天后
实时聚合
补充聚合
连续聚合视图
永久保留
秒级查询
自动删除
释放空间
深度压缩
归档存储
按需查询
自动压缩
压缩比 11:1
查询性能良好
未压缩
快速查询
实时分析
新数据写入

四、经验总结与最佳实践

4.1、OpenTeleDB 的独特价值

从 11 月 1 日 OpenTeleDB 正式开源,到 11 月 19 日完成生产环境迁移和验证,这 18 天的经历让我对这款国产开源数据库有了深刻的认识。

最打动我的三个特性

1. 真正的开箱即用

不需要像 InfluxDB 那样学习新的查询语言(InfluxQL),也不需要像 Cassandra 那样设计复杂的数据模型。如果你会 PostgreSQL,就能立刻上手 OpenTeleDB。

我们团队的另一位开发同事,之前没接触过时序数据库,看了半天文档就能写出高效的查询。这种学习曲线对团队来说太重要了。

2. 自动化运维能力

最让我省心的是数据生命周期管理。以前在 MySQL 上,我们需要写定时任务来归档和删除历史数据,脚本维护起来很麻烦。

OpenTeleDB 的数据保留策略和自动压缩,让这一切变得简单:

-- 设置保留策略:原始数据保留90天
SELECT add_retention_policy('sensor_data', INTERVAL '90 days');

-- 设置压缩策略:7天前的数据自动压缩
SELECT add_compression_policy('sensor_data', INTERVAL '7 days');

-- 就这么简单,不需要写任何脚本

上线后的这几天,我没有因为数据库问题被半夜叫醒过一次。这在 MySQL 时代是不可想象的。

3. 性价比

这是向老板汇报时最有说服力的数据:

  • 服务器成本:从 5 台 MySQL 服务器减少到 3 台 OpenTeleDB 服务器
  • 存储成本:每月节省 6300 元
  • 人力成本:DBA 的工作量减少了 60% 以上

更重要的是,系统稳定性大幅提升。客户那边的车间主任再也没有抱怨过监控大屏卡顿的问题。

4.2、适用场景建议

场景适配度评估表

场景类型 数据特征 适配度 推荐理由
物联网数据采集 高频写入、时序性强 ⭐⭐⭐⭐⭐ 原生时序优化,压缩比高
应用性能监控 指标密集、实时查询 ⭐⭐⭐⭐⭐ 查询性能优异,聚合能力强
金融行情数据 高并发、低延迟 ⭐⭐⭐⭐⭐ 写入性能强,查询快速
工业互联网 设备多、数据量大 ⭐⭐⭐⭐⭐ 自动分区,运维简单
智慧城市 多源异构、长期存储 ⭐⭐⭐⭐ 压缩效果好,成本低
日志分析 非结构化、查询复杂 ⭐⭐⭐ 需结合其他工具使用
传统 OLTP 事务性强、随机读写 ⭐⭐ 不如传统关系型数据库

4.3、给后来者的迁移建议

如果你也在考虑迁移到 OpenTeleDB,这些是我用真金白银(和加班时间)换来的经验:

1. 一定要做 POC 测试(至少 2 周)

不要看到性能数据就激动,一定要用真实数据测试。我们的 POC 测试发现了 3 个业务查询需要优化,如果直接上生产,后果不堪设想。

测试清单:

  • 导入至少 1 个月的真实历史数据
  • 测试所有业务查询,记录响应时间
  • 模拟高峰期写入压力
  • 测试主从切换和故障恢复
  • 验证备份恢复流程

2. 分阶段迁移,不要一步到位

我们的迁移时间线:

  • 第 1 周:搭建测试环境,导入历史数据
  • 第 2 周:应用层改造,实现双写
  • 第 3 周:灰度切换读流量(10% -> 50% -> 100%)
  • 第 4 周:观察稳定性,准备回滚方案
  • 第 5 周:下线 MySQL,完成迁移

每个阶段都留足观察时间,不要急。

迁移风险控制矩阵

监控指标
控制措施
风险识别
数据一致性率
目标: 99.99%
查询响应时间
目标: <500ms
系统可用性
目标: 99.9%
回滚时间
目标: <5min
双写验证
数据校验脚本
定期对账
POC测试
压力测试
性能监控
灰度发布
快速回滚
应急预案
保留旧系统
备份数据
回滚演练
数据一致性风险
性能下降风险
业务中断风险
回滚失败风险

3. 监控和告警必须先行

在切换流量之前,我们配置了完整的监控:

# Prometheus监控配置
scrape_configs:
  - job_name: 'openteledb'
    static_configs:
      - targets: 
          - '192.168.1.101:9187'  # 主节点
          - '192.168.1.102:9187'  # 从节点1
          - '192.168.1.103:9187'  # 从节点2
    metrics_path: '/metrics'
    scrape_interval: 15s
    scrape_timeout: 10s
    
  - job_name: 'openteledb-exporter'
    static_configs:
      - targets: ['192.168.1.101:9187']
    relabel_configs:
      - source_labels: [__address__]
        target_label: instance
        replacement: 'openteledb-master'
        
# 关键告警规则
groups:
  - name: openteledb_alerts
    rules:
      # 写入延迟告警
      - alert: HighWriteLatency
        expr: pg_stat_database_tup_inserted_rate > 50000 and pg_stat_database_blks_hit_rate < 0.95
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "OpenTeleDB写入延迟过高"
          description: "实例 {{ $labels.instance }} 写入延迟超过阈值,当前值: {{ $value }}"
      
      # 主从复制延迟告警
      - alert: ReplicationLag
        expr: pg_replication_lag_seconds > 10
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "OpenTeleDB主从复制延迟"
          description: "从节点 {{ $labels.instance }} 复制延迟 {{ $value }} 秒"
      
      # 连接数告警
      - alert: HighConnectionCount
        expr: pg_stat_database_numbackends > 400
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "OpenTeleDB连接数过高"
          description: "实例 {{ $labels.instance }} 连接数: {{ $value }}"
      
      # 磁盘使用率告警
      - alert: HighDiskUsage
        expr: (node_filesystem_size_bytes{mountpoint="/data"} - node_filesystem_free_bytes{mountpoint="/data"}) / node_filesystem_size_bytes{mountpoint="/data"} > 0.85
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "磁盘使用率过高"
          description: "实例 {{ $labels.instance }} 磁盘使用率: {{ $value | humanizePercentage }}"
      
      # 查询响应时间告警
      - alert: SlowQuery
        expr: pg_stat_statements_mean_exec_time_seconds > 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "慢查询告警"
          description: "查询平均执行时间: {{ $value }} 秒"
      
      # CPU使用率告警
      - alert: HighCPUUsage
        expr: 100 - (avg by (instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "CPU使用率过高"
          description: "实例 {{ $labels.instance }} CPU使用率: {{ $value }}%"
      
      # 内存使用率告警
      - alert: HighMemoryUsage
        expr: (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes > 0.90
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "内存使用率过高"
          description: "实例 {{ $labels.instance }} 内存使用率: {{ $value | humanizePercentage }}"

监控让我们提前发现了 2 次潜在问题,避免了故障。配套的 Grafana 监控面板配置:

{
  "dashboard": {
    "title": "OpenTeleDB监控面板",
    "panels": [
      {
        "title": "写入TPS",
        "targets": [
          {
            "expr": "rate(pg_stat_database_tup_inserted[1m])",
            "legendFormat": "{{instance}}"
          }
        ],
        "type": "graph"
      },
      {
        "title": "查询响应时间",
        "targets": [
          {
            "expr": "pg_stat_statements_mean_exec_time_seconds",
            "legendFormat": "平均响应时间"
          },
          {
            "expr": "pg_stat_statements_max_exec_time_seconds",
            "legendFormat": "最大响应时间"
          }
        ],
        "type": "graph"
      },
      {
        "title": "连接数",
        "targets": [
          {
            "expr": "pg_stat_database_numbackends",
            "legendFormat": "{{instance}}"
          }
        ],
        "type": "graph"
      },
      {
        "title": "主从复制延迟",
        "targets": [
          {
            "expr": "pg_replication_lag_seconds",
            "legendFormat": "{{instance}}"
          }
        ],
        "type": "graph"
      },
      {
        "title": "缓存命中率",
        "targets": [
          {
            "expr": "pg_stat_database_blks_hit / (pg_stat_database_blks_hit + pg_stat_database_blks_read)",
            "legendFormat": "{{instance}}"
          }
        ],
        "type": "gauge"
      }
    ]
  }
}

自定义监控脚本:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
OpenTeleDB监控脚本
定期检查数据库健康状态并发送告警
"""

import psycopg2
import time
import requests
from datetime import datetime

class OpenTeleDBMonitor:
    def __init__(self, host, port, user, password, database):
        self.conn_params = {
            'host': host,
            'port': port,
            'user': user,
            'password': password,
            'database': database
        }
        self.webhook_url = "https://your-webhook-url.com/alert"
    
    def get_connection(self):
        return psycopg2.connect(**self.conn_params)
    
    def check_replication_lag(self):
        """检查主从复制延迟"""
        conn = self.get_connection()
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT 
                client_addr,
                state,
                sync_state,
                EXTRACT(EPOCH FROM (NOW() - pg_last_xact_replay_timestamp())) as lag_seconds
            FROM pg_stat_replication
        """)
        
        results = cursor.fetchall()
        cursor.close()
        conn.close()
        
        for row in results:
            client_addr, state, sync_state, lag_seconds = row
            if lag_seconds and lag_seconds > 10:
                self.send_alert(
                    f"主从复制延迟告警",
                    f"从节点 {client_addr} 延迟 {lag_seconds:.2f} 秒"
                )
        
        return results
    
    def check_slow_queries(self):
        """检查慢查询"""
        conn = self.get_connection()
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT 
                query,
                calls,
                mean_exec_time,
                max_exec_time
            FROM pg_stat_statements
            WHERE mean_exec_time > 1000  -- 超过1秒
            ORDER BY mean_exec_time DESC
            LIMIT 10
        """)
        
        results = cursor.fetchall()
        cursor.close()
        conn.close()
        
        if results:
            slow_queries = "\n".join([
                f"Query: {row[0][:100]}..., Avg: {row[2]:.2f}ms, Max: {row[3]:.2f}ms"
                for row in results
            ])
            self.send_alert("慢查询告警", slow_queries)
        
        return results
    
    def check_connection_count(self):
        """检查连接数"""
        conn = self.get_connection()
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT 
                COUNT(*) as total_connections,
                COUNT(*) FILTER (WHERE state = 'active') as active_connections,
                COUNT(*) FILTER (WHERE state = 'idle') as idle_connections
            FROM pg_stat_activity
        """)
        
        result = cursor.fetchone()
        cursor.close()
        conn.close()
        
        total, active, idle = result
        if total > 400:
            self.send_alert(
                "连接数告警",
                f"总连接数: {total}, 活跃: {active}, 空闲: {idle}"
            )
        
        return result
    
    def check_table_bloat(self):
        """检查表膨胀"""
        conn = self.get_connection()
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT 
                schemaname,
                tablename,
                pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size,
                n_dead_tup,
                n_live_tup,
                CASE 
                    WHEN n_live_tup > 0 
                    THEN ROUND(n_dead_tup::numeric / n_live_tup::numeric, 4)
                    ELSE 0 
                END as dead_ratio
            FROM pg_stat_user_tables
            WHERE n_dead_tup > 10000
            ORDER BY n_dead_tup DESC
            LIMIT 10
        """)
        
        results = cursor.fetchall()
        cursor.close()
        conn.close()
        
        if results:
            bloat_info = "\n".join([
                f"Table: {row[0]}.{row[1]}, Size: {row[2]}, Dead tuples: {row[3]}, Ratio: {row[5]}"
                for row in results
            ])
            self.send_alert("表膨胀告警", bloat_info)
        
        return results
    
    def send_alert(self, title, message):
        """发送告警"""
        payload = {
            "title": title,
            "message": message,
            "timestamp": datetime.now().isoformat(),
            "severity": "warning"
        }
        
        try:
            response = requests.post(self.webhook_url, json=payload, timeout=5)
            print(f"Alert sent: {title} - {response.status_code}")
        except Exception as e:
            print(f"Failed to send alert: {e}")
    
    def run_all_checks(self):
        """运行所有检查"""
        print(f"[{datetime.now()}] Running health checks...")
        
        try:
            self.check_replication_lag()
            self.check_slow_queries()
            self.check_connection_count()
            self.check_table_bloat()
            print("All checks completed")
        except Exception as e:
            print(f"Error during health check: {e}")
            self.send_alert("监控脚本异常", str(e))

if __name__ == "__main__":
    monitor = OpenTeleDBMonitor(
        host='192.168.1.101',
        port=5432,
        user='monitor_user',
        password='monitor_password',
        database='iot_db'
    )
    
    # 每5分钟执行一次检查
    while True:
        monitor.run_all_checks()
        time.sleep(300)

4. 重视索引设计

时序数据库的索引策略和传统数据库完全不同。我总结的几个原则:

-- ✅ 正确:时间列在后面
CREATE INDEX idx_sensor_time ON sensor_data(sensor_id, time DESC);

-- ❌ 错误:时间列在前面
CREATE INDEX idx_time_sensor ON sensor_data(time DESC, sensor_id);

-- ✅ 对于时间列,使用BRIN索引
CREATE INDEX idx_time_brin ON sensor_data USING BRIN(time);

-- ✅ 对于高基数标签,考虑部分索引
CREATE INDEX idx_high_temp ON sensor_data(sensor_id, time DESC) 
WHERE value > 80;

索引设计最佳实践

索引类型 适用场景 优点 缺点 示例
B-Tree 索引 标签列 + 时间列 查询快速 占用空间大 (sensor_id, time DESC)
BRIN 索引 时间列 占用空间小 适合顺序扫描 USING BRIN(time)
部分索引 特定条件查询 索引小,效率高 适用范围窄 WHERE value > 80
复合索引 多条件查询 覆盖多种查询 维护成本高 (device_id, metric_type, time)

索引效果对比

正确索引
错误索引
无索引
添加索引
优化索引
索引命中
0.18秒
扫描0.5%数据
索引效率低
2.8秒
扫描40%数据
全表扫描
5.2秒
扫描100%数据

5. 准备回滚方案

虽然我们没用上,但回滚方案必须准备好:

  • 保留 MySQL 集群至少 1 个月
  • 双写期间记录所有失败的写入,准备补偿脚本
  • 准备流量切换脚本,能在 5 分钟内切回 MySQL
  • 每天备份 OpenTeleDB 数据

6. 团队培训不能省

我们组织了 3 次内部培训:

  • 第 1 次:OpenTeleDB 基础概念和架构
  • 第 2 次:时序数据建模和查询优化
  • 第 3 次:运维和故障处理

培训后,团队成员都能独立处理常见问题,大大降低了我的工作压力。

五、后续规划与展望

5.1、下一步计划(2026 年 Q1)

OpenTeleDB 在生产环境的成功应用,让我们有信心在更多场景中使用它。目前正在规划的项目:

技术演进路线图

2026-01-04 2026-01-11 2026-01-18 2026-01-25 2026-02-01 2026-02-08 2026-02-15 2026-02-22 2026-03-01 2026-03-08 2026-03-15 2026-03-22 扩展到工厂2 实时分析平台设计 扩展到工厂3 查询性能调优 异常检测模块开发 扩展到工厂4 分布式方案调研 预测性维护模块开发 分布式集群测试 存储优化 监控体系完善 生产环境升级 扩展部署 平台建设 架构升级 性能优化 OpenTeleDB应用演进计划 (2026 Q1-Q2)

1. 扩展到更多产线

当前只有一个工厂在使用,公司还有另外 3 个客户的工厂项目。计划在 2026 年 1 月前完成迁移,预计将管理 5000+ 传感器的数据。

规模扩展预测

指标 当前 Q1 目标 Q2 目标
工厂数量 1 4 8
传感器数量 1200 5000 12000
日数据量 30GB 120GB 300GB
并发写入TPS 40000 160000 400000
存储总量 1.4TB 5.6TB 14TB

2. 构建实时数据分析平台

基于 OpenTeleDB 的高性能查询能力,我们计划开发一个实时数据分析平台:

应用展示层
分析计算层
数据存储层
数据采集层
监控大屏
移动端APP
告警系统
报表系统
实时异常检测
基于统计模型
设备健康度评分
多维度指标
预测性维护
机器学习模型
能耗优化分析
趋势预测
OpenTeleDB
时序数据存储
传感器数据
设备日志
业务数据

3. 探索分布式部署

随着数据量的增长,单节点可能会成为瓶颈。我们计划测试 OpenTeleDB 的分布式能力,构建多节点集群。

分布式架构规划

元数据管理
查询节点
数据节点集群
接入层
元数据节点
路由信息
查询节点1
查询节点2
数据节点1
负责设备1-300
数据节点2
负责设备301-600
数据节点3
负责设备601-900
数据节点4
负责设备901-1200
负载均衡器

5.2、对 OpenTeleDB 的期待

作为一个深度用户,我也有一些期待和建议:

功能层面

  • 希望能提供更多的时序分析函数,比如异常检测、趋势预测等
  • 支持更灵活的数据分层存储(热数据 SSD,冷数据对象存储)
  • 增强可视化管理工具,降低运维门槛

生态层面

  • 与更多国产中间件的集成(比如国产消息队列、国产监控系统)
  • 提供更多行业解决方案和最佳实践
  • 丰富客户端 SDK,支持更多编程语言

社区层面

  • 希望能有更多的技术分享和案例
  • 定期的线上/线下技术交流活动
  • 更详细的性能调优文档

5.3、写在最后

从 11 月 1 日 OpenTeleDB 正式开源,到现在写下这篇文章,仅仅过去了 18 天。这段紧凑而充实的经历让我深刻体会到:

  • 技术选型没有银弹。OpenTeleDB 解决了我们的时序数据问题,但它不是万能的。选择合适的工具,需要深入理解业务场景和技术特性。
  • 国产数据库在崛起。从 2015 年开始写技术博客到现在,我见证了中国开源技术的快速发展。以前提到数据库,大家想到的都是 Oracle、MySQL、PostgreSQL。但这次使用 OpenTeleDB 的经历让我看到,国产数据库在某些领域已经完全可以和国际主流产品竞争,甚至在某些方面更有优势。
  • 开源社区的力量。OpenTeleDB 的成功离不开活跃的开源社区。作为 CSDN 博客专家、开源中国首位 OSC 优秀原创作者,以及多个技术社区的运营者,我深知社区的价值。作为受益者,我也会积极参与 OpenTeleDB 社区建设,分享经验,帮助更多人。

在我运营的 CSDN 成都站和 AWS User Group Chengdu 社区中,已经有不少开发者开始关注和尝试 OpenTeleDB。我计划在 2026 年组织更多关于国产数据库的技术分享活动,让更多西南地区的开发者了解这些优秀的技术产品。如果你正在为时序数据存储而烦恼,如果你在考虑数据库迁移方案,如果你想尝试国产数据库,我强烈建议你试试 OpenTeleDB。从 Gitee 下载,搭建测试环境,导入你的数据,运行你的查询。也许,它就是你一直在寻找的那个答案。

最后,感谢 OpenTeleDB 团队的辛勤付出,感谢开源社区的支持,也感谢我的团队成员在迁移过程中的配合。这次成功的迁移,是大家共同努力的结果。

附录

附录 1、关于作者

郭靖,笔名“白鹿第一帅”,大数据与大模型开发工程师,中国开发者影响力年度榜单人物。现任职于某互联网大厂,主要从事企业大数据开发与大模型应用领域研究,曾任职于多家知名互联网公司和云服务厂商,拥有丰富的大数据架构和数据库实战经验。自 2015 年至今持续 11 年的技术博客写作经历,累计发布技术博客与测评 300 余篇,全网粉丝超 60000+,总浏览量突破 1500000+,获得多个技术平台认证专家和优质内容创作者称号。

本文基于真实的工业物联网数据采集与监控系统项目,涉及 1200+ 传感器、日均数据量 30GB 的生产环境,迁移时间为 2025 年 11 月 1 日 - 19 日,OpenTeleDB 开源后 19 天完成迁移,团队规模 5 人。技术栈包括 Spring Boot 2.7 + Java 11、MySQL 5.7(分库分表)迁移至 OpenTeleDB ,使用 Prometheus + Grafana 进行监控,部署在 CentOS 7.9 环境。

CSDN 博客https://blog.csdn.net/qq_22695001

附录 2、参考资料

OpenTeleDB 官方资源

时序数据库相关

数据库迁移最佳实践

  • 《数据库迁移实战指南》
  • 《大规模数据迁移的风险控制与实践》
  • 阿里云数据库迁移服务文档
  • AWS Database Migration Service 最佳实践

监控与运维

相关技术栈文档

交流方式:如果你在使用 OpenTeleDB 过程中遇到问题,或者想交流时序数据库的实践经验,欢迎通过以下方式:


文章作者白鹿第一帅作者主页https://blog.csdn.net/qq_22695001,未经授权,严禁转载,侵权必究!


总结

回顾这次从 MySQL 到 OpenTeleDB 的迁移之旅,从 11 月 1 日 OpenTeleDB 正式开源到 11 月 19 日完成生产环境迁移,仅用 18 天就完成了这次技术架构的升级,这不仅验证了 OpenTeleDB 的技术成熟度,更见证了国产开源数据库的快速发展。OpenTeleDB 在时序数据处理方面展现出的技术优势令人印象深刻:7.7 倍的写入性能提升、90% 的查询响应时间降低、74% 的存储成本节省,这些数据背后是其针对时序场景的深度优化,自动分区管理、列式压缩、连续聚合等特性让原本复杂的运维工作变得简单高效,更重要的是 OpenTeleDB 完全兼容 PostgreSQL 生态,大大降低了迁移成本和学习门槛。这次迁移实践也让我深刻体会到,技术选型没有银弹,关键是要深入理解业务场景和技术特性,分阶段迁移、充分测试、完善监控、准备回滚方案,这些看似繁琐的步骤正是保障迁移成功的关键。OpenTeleDB 的开源是国产数据库发展的重要里程碑,它在时序数据领域的表现证明我们完全有能力打造世界级的开源数据库产品,希望通过这篇文章的分享能够帮助更多团队了解 OpenTeleDB,在时序数据处理的道路上少走弯路,如果你也在为海量时序数据存储而烦恼,不妨试试刚刚开源的 OpenTeleDB,也许它就是你一直在寻找的答案。

在这里插入图片描述


我是白鹿,一个不懈奋斗的程序猿。望本文能对你有所裨益,欢迎大家的一键三连!若有其他问题、建议或者补充可以留言在文章下方,感谢大家的支持!

Logo

数据库是今天社会发展不可缺少的重要技术,它可以把大量的信息进行有序的存储和管理,为企业的数据处理提供了强大的保障。

更多推荐