用Stream API处理AI返回结果:优雅的数据流转之道

最近在做一个AI应用,需要处理大量AI返回的数据。刚开始用传统的for循环,代码写得很冗长。后来改用Java 8的Stream API,发现代码简洁多了,而且性能也不错。今天就来分享一下怎么用Stream API优雅地处理AI返回结果。

在这里插入图片描述

为什么用Stream API?

AI应用经常要处理的数据:

  • 批量调用API,处理多个结果
  • 过滤、转换、聚合数据
  • 处理流式返回的数据
  • 复杂的嵌套数据结构

用传统方式写,代码又长又难读:

// 传统方式:冗长且容易出错
List<String> results = new ArrayList<>();
for (String input : inputs) {
    String response = aiClient.call(input);
    if (response != null && response.length() > 10) {
        String processed = processResponse(response);
        if (processed != null) {
            results.add(processed);
        }
    }
}

用Stream API就简洁多了:

// Stream API:简洁优雅
List<String> results = inputs.stream()
    .map(aiClient::call)
    .filter(response -> response != null && response.length() > 10)
    .map(this::processResponse)
    .filter(Objects::nonNull)
    .collect(Collectors.toList());

基础用法:批量处理AI结果

场景1:批量调用API

最常见的场景,批量调用AI API,处理结果:

@Service
public class BatchAIService {
    
    @Autowired
    private ChatClient chatClient;
    
    // 批量处理
    public List<String> batchProcess(List<String> inputs) {
        return inputs.stream()
            .map(input -> {
                try {
                    return chatClient.call(input);
                } catch (Exception e) {
                    log.error("Failed to process: {}", input, e);
                    return null;
                }
            })
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
    }
    
    // 并行处理(提高性能)
    public List<String> batchProcessParallel(List<String> inputs) {
        return inputs.parallelStream()
            .map(this::processInput)
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
    }
    
    private String processInput(String input) {
        try {
            return chatClient.call(input);
        } catch (Exception e) {
            log.error("Failed to process: {}", input, e);
            return null;
        }
    }
}

场景2:过滤和转换

AI返回的结果可能需要过滤和转换:

public List<String> filterAndTransform(List<String> responses) {
    return responses.stream()
        .filter(response -> response.length() > 20) // 过滤太短的回答
        .filter(response -> !response.contains("错误")) // 过滤包含错误的
        .map(String::toLowerCase) // 转小写
        .map(this::extractKeywords) // 提取关键词
        .distinct() // 去重
        .collect(Collectors.toList());
}

private String extractKeywords(String text) {
    // 简单的关键词提取逻辑
    return Arrays.stream(text.split(" "))
        .filter(word -> word.length() > 3)
        .limit(5)
        .collect(Collectors.joining(", "));
}

高级用法:处理复杂数据结构

场景3:处理JSON响应

AI返回的可能是JSON,需要解析和处理:

@Service
public class JSONProcessingService {
    
    @Autowired
    private ObjectMapper objectMapper;
    
    // 解析AI返回的JSON数组
    public List<User> parseUsers(List<String> jsonResponses) {
        return jsonResponses.stream()
            .map(this::parseJson)
            .filter(Objects::nonNull)
            .flatMap(json -> json.stream()) // 展平嵌套列表
            .filter(this::isValidUser)
            .collect(Collectors.toList());
    }
    
    private List<User> parseJson(String json) {
        try {
            return objectMapper.readValue(
                json, 
                new TypeReference<List<User>>() {}
            );
        } catch (Exception e) {
            log.error("Failed to parse JSON", e);
            return Collections.emptyList();
        }
    }
    
    private boolean isValidUser(User user) {
        return user.getName() != null && user.getEmail() != null;
    }
}

场景4:分组和聚合

统计AI返回结果的各种信息:

public Map<String, Long> groupByCategory(List<AIResponse> responses) {
    return responses.stream()
        .collect(Collectors.groupingBy(
            AIResponse::getCategory,
            Collectors.counting()
        ));
}

// 按类别分组,并计算平均分数
public Map<String, Double> averageScoreByCategory(List<AIResponse> responses) {
    return responses.stream()
        .collect(Collectors.groupingBy(
            AIResponse::getCategory,
            Collectors.averagingDouble(AIResponse::getScore)
        ));
}

// 找出每个类别的最佳结果
public Map<String, AIResponse> bestByCategory(List<AIResponse> responses) {
    return responses.stream()
        .collect(Collectors.toMap(
            AIResponse::getCategory,
            Function.identity(),
            (r1, r2) -> r1.getScore() > r2.getScore() ? r1 : r2
        ));
}

处理流式数据

AI的流式返回可以用Stream API处理:

@Service
public class StreamProcessingService {
    
    @Autowired
    private ChatClient chatClient;
    
    // 处理流式响应
    public String processStream(String input) {
        StringBuilder result = new StringBuilder();
        
        chatClient.stream(input)
            .map(chunk -> chunk.getResult().getOutput().getContent())
            .filter(content -> content != null && !content.isEmpty())
            .forEach(result::append);
        
        return result.toString();
    }
    
    // 实时处理流式数据
    public void processStreamRealTime(String input, Consumer<String> onChunk) {
        chatClient.stream(input)
            .map(chunk -> chunk.getResult().getOutput().getContent())
            .filter(Objects::nonNull)
            .forEach(onChunk);
    }
    
    // 收集流式数据到列表
    public List<String> collectStream(String input) {
        return chatClient.stream(input)
            .map(chunk -> chunk.getResult().getOutput().getContent())
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
    }
}

错误处理和容错

Stream API处理错误也很优雅:

public List<String> processWithErrorHandling(List<String> inputs) {
    return inputs.stream()
        .map(this::safeProcess)
        .filter(Optional::isPresent)
        .map(Optional::get)
        .collect(Collectors.toList());
}

private Optional<String> safeProcess(String input) {
    try {
        String result = chatClient.call(input);
        return Optional.ofNullable(result);
    } catch (Exception e) {
        log.error("Failed to process: {}", input, e);
        return Optional.empty();
    }
}

// 或者用Try模式(需要额外的库,比如vavr)
public List<String> processWithTry(List<String> inputs) {
    return inputs.stream()
        .map(input -> Try.of(() -> chatClient.call(input)))
        .filter(Try::isSuccess)
        .map(Try::get)
        .collect(Collectors.toList());
}

性能优化

并行流的使用

对于CPU密集型或IO密集型任务,可以用并行流:

public List<String> processParallel(List<String> inputs) {
    return inputs.parallelStream()
        .map(this::processInput)
        .collect(Collectors.toList());
}

注意:

  • 并行流有开销,小数据集可能更慢
  • 确保操作是线程安全的
  • IO密集型任务可能用CompletableFuture更好

短路操作

某些操作可以短路,提前结束:

// 找到第一个符合条件的就返回
public Optional<String> findFirstValid(List<String> inputs) {
    return inputs.stream()
        .map(this::processInput)
        .filter(this::isValid)
        .findFirst(); // 短路操作,找到就停止
}

// 检查是否所有都符合条件
public boolean allValid(List<String> inputs) {
    return inputs.stream()
        .map(this::processInput)
        .allMatch(this::isValid); // 短路操作,发现不符合就停止
}

实际案例:AI内容审核系统

一个完整的例子,用Stream API处理AI内容审核:

@Service
@Slf4j
public class ContentModerationService {
    
    @Autowired
    private ChatClient chatClient;
    
    // 批量审核内容
    public ModerationResult batchModerate(List<String> contents) {
        List<ContentReview> reviews = contents.stream()
            .map(this::reviewContent)
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
        
        // 统计结果
        long total = reviews.size();
        long approved = reviews.stream()
            .filter(ContentReview::isApproved)
            .count();
        long rejected = reviews.stream()
            .filter(r -> !r.isApproved())
            .count();
        
        // 按类别分组
        Map<String, Long> byCategory = reviews.stream()
            .filter(r -> !r.isApproved())
            .collect(Collectors.groupingBy(
                ContentReview::getRejectReason,
                Collectors.counting()
            ));
        
        return new ModerationResult(total, approved, rejected, byCategory);
    }
    
    private ContentReview reviewContent(String content) {
        try {
            String prompt = "审核以下内容,判断是否违规:" + content;
            String response = chatClient.call(prompt);
            
            // 解析AI返回的结果
            boolean approved = !response.contains("违规");
            String reason = approved ? null : extractReason(response);
            
            return new ContentReview(content, approved, reason);
        } catch (Exception e) {
            log.error("Failed to review content", e);
            return null;
        }
    }
    
    private String extractReason(String response) {
        // 从AI响应中提取拒绝原因
        return Arrays.stream(response.split("\\n"))
            .filter(line -> line.contains("原因"))
            .findFirst()
            .orElse("未知原因");
    }
}

// 数据类
public record ContentReview(
    String content,
    boolean approved,
    String rejectReason
) {}

public record ModerationResult(
    long total,
    long approved,
    long rejected,
    Map<String, Long> rejectReasons
) {}

总结

Stream API处理AI返回结果确实很优雅:

  1. 代码简洁:用声明式的方式表达逻辑
  2. 易于理解:链式调用,逻辑清晰
  3. 性能不错:可以并行处理
  4. 函数式编程:符合现代Java的编程风格

但也要注意:

  • 不要过度使用,简单循环可能更直观
  • 并行流有开销,要测试性能
  • 注意异常处理,避免静默失败

如果你也在处理AI数据,可以试试Stream API,应该会有惊喜。

更多推荐