管道 - 过滤器(Pipe-and-Filter)架构风格

一、核心定义与本质

管道过滤器是数据流驱动的经典架构模式,属于结构化 / 数据流架构,最早源于 Unix Shell 设计思想。

核心组成两部分

  1. 过滤器(Filter)
    • 独立、无状态、单一职责的数据处理单元;
    • 只做一件事:接收输入数据流 → 转换 / 加工 / 过滤数据 → 输出数据流;
    • 不共享全局数据,不依赖其他过滤器内部状态,可独立复用、替换、并行运行。
  2. 管道(Pipe)
    • 连接两个过滤器的通道,只负责传输数据流,不修改数据;
    • 标准单向传输:上游 Filter 输出 → Pipe → 下游 Filter 输入;
    • 解耦:过滤器之间互不感知对方实现,只约定数据格式。

核心思想

数据流经一系列独立处理组件,每个组件只完成单一处理,组件之间仅通过数据流通信

二、关键特性

优点

  1. 高复用:单个过滤器可在多个流程中重复使用(如日志过滤、格式转换);
  2. 易修改扩展:新增处理逻辑只需插入新过滤器,无需改动原有代码;
  3. 可并行执行:多过滤器可流水线并发处理,提升吞吐量;
  4. 松耦合:过滤器完全隔离,替换其中一个不影响整条链路;
  5. 可调试:管道中间可截流、打印数据流,快速定位异常;
  6. 无状态天然容错:单个过滤器故障不直接击穿全链路(可加熔断)。

缺点

  1. 数据格式强约束:上下游必须统一数据流格式,格式变更改动全链路;
  2. 额外 IO 开销:数据反复序列化 / 传输,大数据场景性能损耗明显;
  3. 不适合交互场景:同步、低延迟人机交互不适合(适合批处理、流式处理);
  4. 复杂分支难维护:多分支、循环管道会大幅提升架构复杂度。

三、管道过滤器常见拓扑结构

  1. 线性串联(最基础) Filter1 → Pipe → Filter2 → Pipe → Filter3
  2. 分叉(广播) 一个过滤器输出,通过管道分发给多个下游过滤器并行处理
  3. 汇聚(合并) 多个过滤器输出管道汇入同一个下游过滤器(如多日志合并)
  4. 循环管道 处理后数据回流上游再次过滤(较少用,易死循环)

四、最经典真实底层示例:Unix/Linux Shell 命令(原生管道过滤器)

原理

Unix 每个命令就是Filter| 符号就是Pipe; 每个命令默认:标准输入 stdin、标准输出 stdout,管道把前一个 stdout 接到后一个 stdin。

示例 1:日志过滤完整链路
# 原始日志文件读取(Filter1) → 筛选错误日志(Filter2) → 提取时间字段(Filter3) → 排序(Filter4) → 输出文件(Filter5)
cat app.log | grep "ERROR" | awk '{print $1,$2}' | sort > error_time.log

拆解对应架构:

  1. Filter1:cat — 读取文件,输出原始日志流
  2. Pipe1:| — 传输全部日志文本
  3. Filter2:grep "ERROR" — 过滤器:只保留包含 ERROR 的行
  4. Pipe2:| — 传输错误日志行
  5. Filter3:awk — 过滤器:截取每行前两列(日期、时间)
  6. Pipe3:| — 传输时间文本流
  7. Filter4:sort — 过滤器:按时间排序
  8. Pipe4:> — 管道写入目标文件

特点:

  • 每个命令独立、单一职责,可单独拿出来使用;
  • 管道只传文本,不处理业务;
  • 随意增删过滤器:加head -10只看前 10 条,不改动原有逻辑。

五、工程级真实业务系统示例(后端开发常用)

示例 1:实时日志处理系统(ELK 架构 = 标准管道过滤器)

整体链路:原始日志 → Filebeat → Logstash → Elasticsearch → Kibana 逐个映射:

  1. Filter1 Filebeat:采集服务器日志文件、切割日志流
  2. Pipe1:网络 TCP 通道传输日志原始文本
  3. Filter2 Logstash(多过滤器串联)
    • grok 过滤器:解析非结构化日志为 JSON 结构化数据
    • date 过滤器:统一时间字段格式
    • drop 过滤器:丢弃无用调试日志
    • mutate 过滤器:新增业务标签字段
  4. Pipe2:http 管道传输结构化 JSON
  5. Filter3 Elasticsearch:存储、索引、分词过滤数据
  6. Pipe3:查询数据流
  7. Filter4 Kibana:聚合、图表过滤、可视化输出

特点:任意一层过滤器可替换(Filebeat 替换为 Flink 采集,Logstash 替换为 Flink Transform),完全符合管道过滤器设计。

示例 2:大数据实时计算 Flink/Spark Streaming

流式计算是管道过滤器工业级落地: 数据流链路:消息队列 (Kafka) → 数据源 Filter → 清洗 Filter → 转换 Filter → 聚合 Filter → 入库 Filter

  • 每个算子 (map/filter/flatMap) = 独立过滤器;
  • 上下游算子之间的数据通道 = 管道;
  • 可随意插入新算子(脱敏、风控过滤),不修改原有计算逻辑。

业务场景:电商用户行为流处理

  1. Filter1:读取 Kafka 用户点击日志
  2. Filter2:清洗:过滤非法埋点、空数据
  3. Filter3:脱敏:手机号、身份证加密处理
  4. Filter4:转换:拆分行为字段、关联商品基础数据
  5. Filter5:聚合:统计每小时商品点击量
  6. Filter6:输出:写入 MySQL 报表库

六、日志处理Demo


package test;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

// ===================== 统一数据流载体(管道传输的数据) =====================
class DataRecord {
    private String content;
    public DataRecord(String content) {
        this.content = content;
    }
    public String getContent() {
        return content;
    }
}

// ===================== 过滤器顶层接口 所有处理单元实现该接口 =====================
interface Filter {
    List<DataRecord> process(List<DataRecord> input);
}

// ===================== 管道工具类:串联所有过滤器 =====================
class PipeLine {
    // 链式执行所有过滤器,数据在管道中流转
    public static List<DataRecord> run(List<DataRecord> sourceData, Filter... filters) {
        List<DataRecord> data = sourceData;
        for (Filter filter : filters) {
            // 管道传输数据到下一个过滤器
            data = filter.process(data);
        }
        return data;
    }
}

// ===================== 自定义实体:用户行为 =====================
class UserAction {
    private String userId;
    private String action;
    private Long productId;

    public UserAction(String userId, String action, Long productId) {
        this.userId = userId;
        this.action = action;
        this.productId = productId;
    }

    @Override
    public String toString() {
        return "UserAction{" +
                "userId='" + userId + '\'' +
                ", action='" + action + '\'' +
                ", productId=" + productId +
                '}';
    }
}

// ===================== 各个独立过滤器(对应Flink算子) =====================
// 过滤器1:模拟数据源,批量生成原始日志
class SourceFilter implements Filter {
    @Override
    public List<DataRecord> process(List<DataRecord> input) {
        List<DataRecord> result = new ArrayList<>();
        Random random = new Random();
        String[] actions = {"click", "buy", "collect", "invalid"};
        // 模拟生成10条埋点日志
        for (int i = 0; i < 10; i++) {
            String uid = "u" + random.nextInt(9999);
            String act = actions[random.nextInt(actions.length)];
            long pid = random.nextInt(1000);
            String log = uid + "," + act + "," + pid;
            result.add(new DataRecord(log));
        }
        return result;
    }
}

// 过滤器2:清洗过滤,丢弃非法脏数据
class CleanFilter implements Filter {
    @Override
    public List<DataRecord> process(List<DataRecord> input) {
        List<DataRecord> result = new ArrayList<>();
        for (DataRecord record : input) {
            String line = record.getContent();
            String[] arr = line.split(",");
            // 过滤格式错误、非法行为
            if (arr.length != 3) {
                continue;
            }
            String action = arr[1];
            if ("click".equals(action) || "buy".equals(action) || "collect".equals(action)) {
                result.add(record);
            }
        }
        return result;
    }
}

// 过滤器3:字符串日志转结构化对象(模拟map转换算子)
class ConvertFilter implements Filter {
    // 转换后存储对象,单独输出
    public List<UserAction> convert(List<DataRecord> input) {
        List<UserAction> list = new ArrayList<>();
        for (DataRecord record : input) {
            String[] arr = record.getContent().split(",");
            UserAction action = new UserAction(arr[0], arr[1], Long.valueOf(arr[2]));
            list.add(action);
        }
        return list;
    }

    @Override
    public List<DataRecord> process(List<DataRecord> input) {
        return input;
    }
}

// 过滤器4:用户ID脱敏处理
class MaskFilter {
    public List<UserAction> mask(List<UserAction> input) {
        List<UserAction> res = new ArrayList<>();
        for (UserAction ua : input) {
            String rawId = ua.toString().split("userId='")[1].split("'")[0];
            String maskId = rawId.substring(0, 2) + "****";
            String action = ua.toString().split("action='")[1].split("'")[0];
            String pid = ua.toString().split("productId=")[1].split("}")[0];
            res.add(new UserAction(maskId, action, Long.valueOf(pid)));
        }
        return res;
    }
}

// ===================== 主程序:组装整条管道过滤器链路 =====================
public class SimplePipeFilterDemo {
    public static void main(String[] args) {
        // 1. 组装过滤器链路
        Filter source = new SourceFilter();
        Filter clean = new CleanFilter();
        ConvertFilter convert = new ConvertFilter();
        MaskFilter mask = new MaskFilter();

        // 2. 管道流转数据:生成原始数据 -> 清洗
        List<DataRecord> rawData = source.process(new ArrayList<>());
        List<DataRecord> cleanData = PipeLine.run(rawData, clean);

        // 3. 管道下游:转换实体 + 脱敏
        List<UserAction> entityList = convert.convert(cleanData);
        List<UserAction> finalResult = mask.mask(entityList);

        // 4. 输出过滤器:打印最终结果
        System.out.println("===== 管道过滤器最终输出结果 =====");
        for (UserAction item : finalResult) {
            System.out.println(item);
        }
    }
}

更多推荐