从ClickHouse迁移到Doris集群:Array字段的深度适配与实战解析

当交通大数据平台的设备数量从几十个路口扩展到两百多个路口时,单点ClickHouse的架构瓶颈开始显现。我们选择了Doris集群作为新的数据仓库解决方案,但在迁移过程中,Array字段的处理成为技术团队面临的首要挑战。本文将分享我们在数据类型适配、查询改写和Java代码实现层面的完整解决方案。

1. 核心差异:ClickHouse与Doris的Array实现对比

ClickHouse和Doris虽然都支持Array数据类型,但在底层实现和使用细节上存在显著差异。ClickHouse采用列式存储和向量化执行引擎,对Array类型的支持更为原生,而Doris作为MPP数据库,其Array实现更注重分布式场景下的稳定性。

存储引擎差异对比表:

特性 ClickHouse Doris
底层存储格式 原生列式存储 列式存储+分片副本
Array元素最大数量 理论无限制 默认100万元素限制
NULL值处理 保留NULL语义 自动跳过NULL计算
内存管理机制 激进的内存分配 可控的内存预算

在函数支持方面,Doris 2.0版本后新增了十余个数组专用函数。例如处理交通冲突数据时,我们常用到以下函数组合:

-- 计算各进口道冲突次数的统计值
SELECT 
    intersection_number,
    array_max(traffic_conflicts) AS max_conflicts,
    array_min(traffic_conflicts) AS min_conflicts,
    array_avg(traffic_conflicts) AS avg_conflicts
FROM intersection_safety_stats

注意:Doris的array_avg()函数对空数组会返回NULL,这与ClickHouse返回NaN的行为不同,需要在应用层做特殊处理。

2. 迁移方案设计:从数据模型到查询改写

2.1 表结构适配策略

在交通数据场景中,我们遇到最典型的问题是进口道指标的存储。ClickHouse方案使用Nested类型嵌套Array,而Doris需要扁平化处理:

原始ClickHouse方案:

CREATE TABLE ch_safety_stats (
    intersection_id UInt32,
    approaches Nested(
        name String,
        conflict_count UInt16,
        violation_rate Float32
    )
)

Doris适配方案:

CREATE TABLE doris_safety_stats (
    intersection_id INT,
    approach_names ARRAY<VARCHAR(20)>,
    conflict_counts ARRAY<SMALLINT>,
    violation_rates ARRAY<FLOAT>
)
DUPLICATE KEY(intersection_id)

这种设计虽然损失了部分语义关联性,但获得了更好的分布式查询性能。对于关联查询,我们通过数组下标保持数据对应关系。

2.2 查询模式转换

ClickHouse常用的arrayJoin在Doris中需要通过explode函数配合LATERAL VIEW实现:

-- ClickHouse原始查询
SELECT 
    intersection_id, 
    approach_name,
    conflict_count
FROM ch_safety_stats
ARRAY JOIN approaches

-- Doris等效实现
SELECT 
    t.intersection_id,
    e.approach_name,
    e.conflict_count
FROM doris_safety_stats t
LATERAL VIEW explode(
    arrays_zip(
        approach_names, 
        conflict_counts, 
        violation_rates
    )
) e AS approach_name, conflict_count, violation_rate

复杂场景下,我们开发了UDF函数处理特定的数组操作逻辑。例如计算进口道安全评分时:

// 注册UDF函数
public class SafetyScoreUdf extends ScalarFunction {
    public Float evaluate(List<String> approaches, List<Float> scores) {
        float total = 0;
        int count = 0;
        for (int i = 0; i < approaches.size(); i++) {
            if (scores.get(i) != null) {
                total += scores.get(i);
                count++;
            }
        }
        return count > 0 ? total / count : 0;
    }
}

3. Java工程化实践:从数据接入到应用层处理

3.1 数据写入适配

交通数据采集端通常使用Protocol Buffers格式,我们需要将其中的repeated字段转换为Doris的Array格式。采用Jackson自定义序列化方案:

public class ArrayTypeModule extends SimpleModule {
    public ArrayTypeModule() {
        addSerializer(List.class, new JsonSerializer<List>() {
            @Override
            public void serialize(List value, JsonGenerator gen, SerializerProvider provider) 
                throws IOException {
                gen.writeString(ArrayUtils.toString(value));
            }
        });
    }
}

// 使用示例
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new ArrayTypeModule());
String json = mapper.writeValueAsString(trafficData);

对于实时数据流,我们开发了Kafka Connect转换插件:

public class ArrayFieldConverter implements Converter {
    @Override
    public SchemaAndValue convert(Schema schema, Object value) {
        if (value instanceof List) {
            return new SchemaAndValue(
                Schema.STRING_SCHEMA, 
                ((List<?>) value).stream()
                    .map(Object::toString)
                    .collect(Collectors.joining(",", "[", "]"))
            );
        }
        return new SchemaAndValue(schema, value);
    }
}

3.2 查询结果处理

Doris JDBC驱动返回的Array对象需要特殊处理。我们封装了ResultSet解析工具类:

public class DorisArrayParser {
    public static <T> List<T> parseArray(String arrayStr, Function<String, T> converter) {
        if (arrayStr == null || arrayStr.length() < 2) {
            return Collections.emptyList();
        }
        String content = arrayStr.substring(1, arrayStr.length() - 1);
        return Arrays.stream(content.split(","))
            .map(String::trim)
            .filter(s -> !s.isEmpty())
            .map(converter)
            .collect(Collectors.toList());
    }
}

// 使用示例
List<Float> conflictRates = DorisArrayParser.parseArray(
    rs.getString("conflict_rates"),
    Float::parseFloat
);

4. 性能优化与疑难问题解决

4.1 数组操作的执行计划优化

通过EXPLAIN分析发现,Doris对大型数组的处理存在以下性能瓶颈:

  1. 内存分配问题 :单个Array超过10MB会导致BE节点内存暴涨
  2. 函数下推限制 :部分数组函数无法下推到存储层执行

优化方案包括:

  • 设置 exec_mem_limit 控制单个查询内存用量
  • 对大型数组采用分块处理策略
  • 在FE配置中调整 max_array_nesting_length

优化前后性能对比:

操作类型 数据量 优化前耗时 优化后耗时
数组聚合查询 10万行 2.3s 1.1s
数组展开连接 1万行 4.5s 1.8s
批量数组插入 1000次 12s 3.2s

4.2 典型错误处理案例

案例一:数组越界问题

// 错误示例:直接使用数组下标
String firstApproach = approaches.get(0);

// 正确做法:防御性编程
String firstApproach = approaches != null && !approaches.isEmpty() 
    ? approaches.get(0) 
    : null;

案例二:类型转换异常

-- 错误示例:混合类型数组
INSERT INTO safety_stats VALUES (1, [1, "high", 3.5])

-- 正确做法:统一元素类型
INSERT INTO safety_stats VALUES (1, ["1", "high", "3.5"])

案例三:分布式执行问题

-- 可能导致数据倾斜的查询
SELECT array_agg(distinct device_id) FROM traffic_logs

-- 优化方案:分阶段聚合
WITH local_agg AS (
    SELECT bitmap_agg(device_id) AS devices 
    FROM traffic_logs GROUP BY partition_key
)
SELECT bitmap_to_array(bitmap_union(devices)) FROM local_agg

在郑州港区项目的实施过程中,我们总结出Array字段迁移的黄金法则:先验证数据语义一致性,再优化查询性能,最后完善异常处理机制。这套方法后来被应用到其他五个区域中心的迁移工作中,平均节省了40%的适配开发时间。

更多推荐