山东大学创新实训个人博客4:流式输出与导航智能体设计开发
这周我们有两个重点:一是把问诊回复从“整段返回”改为“流式返回”,二是完成导航智能体的三组核心 API。整体链路仍然是 Android -> SpringBoot -> FastAPI,但工程目标从能调用升级成可连续输出、可稳定调试、可分阶段推进。
一、流式输出
流式输出的第一步是定义事件协议。
在 Python 端,我们先把一个完整的问诊流拆成几个事件:开始事件、进度事件、正文分片事件、心跳事件和结束事件。这样一来,前端和 Java 中继层都不需要猜测流里到底是什么,只需要按照事件类型去处理就可以。
我们没有把流式输出直接写死在业务推理函数里,而是单独抽出一个 event_generator 来负责协议输出。这样做的逻辑是把“智能体执行”和“事件发送”拆开。问诊智能体仍然专注于完成 A/B/C/D 四段逻辑,而流式层只负责把这些过程转换成对外可见的事件。
async def consult_stream_handler(request: ConsultRequest) -> StreamingResponse:
async def event_generator():
heartbeat_interval = 5.0
last_heartbeat_ts = time.monotonic()
begin_payload = {
"type": "begin",
"trace_id": request.trace_id,
"session_id": request.session_id,
"message_id": request.message_id,
}
yield f"event: begin\ndata: {json.dumps(begin_payload, ensure_ascii=False)}\n\n"
loop = asyncio.get_running_loop()
progress_queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
def on_progress(event: dict[str, Any]) -> None:
payload = {"type": "progress"}
payload.update(event)
loop.call_soon_threadsafe(progress_queue.put_nowait, payload)
result_task = asyncio.create_task(_execute_consult(request, event_hook=on_progress))
这里的 progress_queue 也很关键。
如果没有这个队列,我们只能等整个 _execute_consult 执行完,再统一返回结果,那就失去了流式的意义。现在通过队列,Python 端可以在问诊流程进行到某一阶段时就先把状态推送出去,比如正在做多模态识别,或者正在做核心医学推理。这样前端看到的就不再是一个静止等待的页面,而是一个持续有反馈的过程。
我们在设计上把阶段进度和正文两类事件分开。
前半段是系统过程。此时用户更关心“系统有没有卡住”“是不是还在工作”,所以输出 progress 和 ping 更重要。后半段是结果输出。此时用户已经不只是想知道系统在运行,而是希望看到医生回复一点点生成出来,所以才开始按分片输出 chunk。
try:
while True:
try:
progress_payload = await asyncio.wait_for(progress_queue.get(), timeout=0.2)
yield f"event: progress\ndata: {json.dumps(progress_payload, ensure_ascii=False)}\n\n"
last_heartbeat_ts = time.monotonic()
except asyncio.TimeoutError:
now_ts = time.monotonic()
if now_ts - last_heartbeat_ts >= heartbeat_interval:
ping_payload = {
"type": "ping",
"trace_id": request.trace_id,
"ts": int(time.time() * 1000),
}
yield f"event: ping\ndata: {json.dumps(ping_payload, ensure_ascii=False)}\n\n"
last_heartbeat_ts = now_ts
if result_task.done():
break
result = await result_task
for index, chunk in enumerate(result.iter_chunks(), start=1):
chunk_payload = {
"type": "chunk",
"index": index,
"content": chunk,
}
yield f"event: chunk\ndata: {json.dumps(chunk_payload, ensure_ascii=False)}\n\n"
await asyncio.sleep(0.03)
这里加入 ping 的设计也不是可有可无的。
因为问诊智能体内部有些阶段,比如大模型推理或者 OCR 识别,中间可能会有几秒钟没有任何正文输出。如果我们这时候完全沉默,Java 或前端就可能误判连接已经断开。心跳事件本质上不是给用户看的,而是告诉链路这个连接还活着,后面还有内容。
在 Java 请求 Python 时,我们明确指定 Accept: text/event-stream,告诉 Python 这一轮调用是流式输出。
二、导航智能体设计
我们没有把导航智能体做成和问诊智能体一样的万能聊天接口,而是拆成了三个API。
下面是导航智能体的交互界面草稿
我们把界面分为两大部分,上半部分对接高德地图,下半部分可以选择我们医院资源库,使用算法根据用户地址推荐医院,并生成就医攻略,提供我们人工审核过的挂号网址与咨询电话

- /sessions/{sessionId}/navigation/recommendations
获取医院推荐结果。- /sessions/{sessionId}/navigation/{hospitalId}
基于用户选中的医院输出具体就医攻略。- /sessions/{sessionId}/navigation/state
推进会话阶段到康复。
获取医院推荐结果
拿到用户发送的用户坐标(经纬度),在数据库检索得到最近的4家三甲医院的基本情况和挂号信息(网址和电话)
通过百川医疗大模型API,从该会话之前的历史记录中提取患者需要前往就医的就医科室信息与数据库中医院优势科室标签进行匹配,提供医疗资源最匹配的1家三甲医院
将这5家医院删减成4家医院(如果有重复则正好只提供4家,如果没有重复则删去距离排序第四位的医院,总之提供医疗资源最匹配1家+除这家外距离最近的3家)提供给前端,另外再对每个数据加上一个rank_tag属性,有四种情况1.如果是距离最近+资源最匹配,rank_tag=“optimistic”2.如果只是距离最近rank_tag=“distance”3.如果只是资源最匹配rank_tag=“resource”4.如果都不是rank_tag=“none”
输出具体就医攻略
用户选择一个医院,后端拿到这个医院id,接下来通过百川医疗大模型API,
如果匹配,将以上三种信息给百川医疗大模型API,给出一段文字解释该医院为何能够满足就医条件并指导患者进行网上挂号和预约
如果不匹配,同样给出一段文字解释该医院能力不足无法接纳病情,但无需在指导患者进行网上挂号,而是要劝患者重新进行选择
推进会话阶段
把当前会话从 NAVIGATING 推进到 REHABILITATING,进入康复智能体的领域
三、导航智能体开发
1. 获取医院推荐结果
导航推荐接口对应的是 /sessions/{sessionId}/navigation/recommendations。这一部分的目标不是简单返回距离最近的医院,而是在会话上下文已经存在的前提下,根据用户坐标和问诊历史,给出既考虑距离、又考虑医疗资源匹配度的推荐结果。
导航推荐看起来像一个地理类接口,但本质上仍然属于用户会话的一部分。
因此在真正进入推荐逻辑前,先做 token 鉴权和会话归属校验,确保这次导航请求是建立在当前用户自己的问诊上下文之上的。
@PostMapping("/sessions/{sessionId}/navigation/recommendations")
public Result<List<NavigationHospitalRecommendation>> getNavigationRecommendations(
@PathVariable("sessionId") String sessionId,
@RequestHeader(value = "Authorization", required = false) String authorization,
@RequestParam(value = "token", required = false) String tokenParam,
@RequestBody(required = false) NavigationRecommendationRequest request
) {
Result<UserInfoEntity> authResult = authenticate(authorization, tokenParam);
if (authResult.isFail()) {
return failure(authResult.getCode(), authResult.getMessage());
}
Optional<AgentSessionEntity> ownership = loadOwnedSession(sessionId, authResult.getData().getUserId());
if (ownership.isEmpty()) {
return failure("A0451", "session not found");
}
if (request == null) {
return failure("A0480", "request body is required");
}
if (request.longitude() == null || request.latitude() == null) {
return failure("A0481", "longitude and latitude are required");
}
if (!isLongitudeValid(request.longitude()) || !isLatitudeValid(request.latitude())) {
return failure("A0482", "longitude or latitude is invalid");
}
推荐算法的第一层是地理筛选。
这里没有先把医院全部查出来再在 Java 里算距离,而是直接在数据库层完成距离排序和三甲过滤。这样做可以减少应用层不必要的数据处理,也让推荐逻辑的输入从一开始就是有效候选集。
List<HospitalRouteCandidateView> allHospitals =
hospitalRouteDictRepository.findLevel3AHospitalsOrderByDistance(
request.longitude(),
request.latitude()
);
if (allHospitals.isEmpty()) {
return failure("A0483", "no nearby level 3A hospitals found");
}
List<HospitalRouteCandidateView> nearest4 = allHospitals.stream().limit(4).toList();
if (nearest4.size() < 4) {
return failure("A0485", "insufficient level 3A hospitals, require at least 4");
}
对应的数据库查询中,已经把三甲医院和距离计算都做好了:
@Query(
value = """
SELECT
h.hospital_id AS hospitalId,
h.hospital_name AS hospitalName,
h.level AS level,
h.longitude AS longitude,
h.latitude AS latitude,
h.specialty_departments AS specialtyDepartments,
h.deep_link_url AS deepLinkUrl,
h.phone AS phone,
(
6371 * ACOS(
LEAST(1, GREATEST(-1,
COS(RADIANS(:latitude)) * COS(RADIANS(h.latitude)) *
COS(RADIANS(h.longitude) - RADIANS(:longitude)) +
SIN(RADIANS(:latitude)) * SIN(RADIANS(h.latitude))
))
)
) AS distanceKm
FROM hospital_route_dict h
WHERE h.longitude IS NOT NULL
AND h.latitude IS NOT NULL
AND (
h.level LIKE '%三级甲等%'
OR h.level LIKE '%三甲%'
)
ORDER BY distanceKm ASC, h.hospital_id ASC
""",
nativeQuery = true
)
List<HospitalRouteCandidateView> findLevel3AHospitalsOrderByDistance(
@Param("longitude") BigDecimal longitude,
@Param("latitude") BigDecimal latitude
);
只有距离信息还不够,因为最近的医院不一定最适合当前病情。
所以推荐的第二层逻辑,是从会话历史中抽取更合适的就医科室,然后和医院的优势科室做匹配。Java 不直接做自然语言判断,把历史会话整理后发给 Python,由 Python 调百川提取 department 和 keywords。
List<ChatMessageEntity> history =
chatMessageRepository.findBySessionIdOrderBySendTimeAscMessageIdAsc(
ownership.get().getSessionId()
);
Map<String, Object> modelResult = integrationHttpService.extractNavigationDepartment(
ownership.get().getSessionId(),
authResult.getData().getUserId(),
buildNavigationHistoryPayload(history),
buildDepartmentCandidates(allHospitals)
);
String extractedDepartment = extractModelDepartment(modelResult);
List<String> extractedKeywords = extractModelKeywords(modelResult);
HospitalRouteCandidateView resourceBest =
selectResourceBestHospital(allHospitals, extractedDepartment, extractedKeywords);
if (resourceBest == null && !nearest4.isEmpty()) {
resourceBest = nearest4.get(0);
}
system_prompt = (
"你是医疗分诊助手。请根据用户历史对话提取最可能的就医科室,并返回JSON。"
"必须输出单个JSON对象,不要输出Markdown。JSON字段包括:"
"department、keywords、confidence、rationale。"
"department尽量从department_candidates中选择最匹配的一项。"
)
result = await asyncio.to_thread(
_RUNTIME.run_baichuan_prompt,
system_prompt,
json.dumps(payload, ensure_ascii=False),
True,
)
Java 端拿到 department + keywords 后,再做资源评分:
private HospitalRouteCandidateView selectResourceBestHospital(
List<HospitalRouteCandidateView> candidates,
String department,
List<String> keywords
) {
String normalizedDepartment = normalizeDepartmentToken(department);
List<String> normalizedKeywords = keywords == null
? List.of()
: keywords.stream()
.map(this::normalizeDepartmentToken)
.filter(StringUtils::hasText)
.distinct()
.toList();
HospitalRouteCandidateView best = null;
int bestScore = Integer.MIN_VALUE;
for (HospitalRouteCandidateView item : candidates) {
int score = scoreHospitalSpecialty(
item.getSpecialtyDepartments(),
normalizedDepartment,
normalizedKeywords
);
if (best == null || score > bestScore) {
best = item;
bestScore = score;
continue;
}
if (score == bestScore && best.getDistanceKm() != null && item.getDistanceKm() != null
&& item.getDistanceKm() < best.getDistanceKm()) {
best = item;
}
}
if (bestScore <= 0) {
return null;
}
return best;
}
模型负责理解病情倾向,规则负责保证匹配逻辑可解释。
private int scoreHospitalSpecialty(String specialtyDepartments, String department, List<String> keywords) {
String normalizedSpecialty = normalizeDepartmentToken(specialtyDepartments);
if (!StringUtils.hasText(normalizedSpecialty)) {
return 0;
}
int score = 0;
if (StringUtils.hasText(department) && normalizedSpecialty.contains(department)) {
score += 100;
}
if (keywords != null) {
for (String keyword : keywords) {
if (StringUtils.hasText(keyword) && normalizedSpecialty.contains(keyword)) {
score += 20;
}
}
}
return score;
}
最后一步把“最近的医院集合”和“资源最优医院”合并成固定 4 条结果。
如果资源最优医院本来就在最近 4 家里,就直接返回这 4 家;如果不在,就返回资源最优 1 家加最近 3 家。
List<HospitalRouteCandidateView> final4 = mergeFinalRecommendations(nearest4, resourceBest, 4);
Long distanceNearestId = allHospitals.get(0).getHospitalId();
Long resourceBestId = resourceBest == null ? null : resourceBest.getHospitalId();
List<NavigationHospitalRecommendation> recommendationList = final4.stream()
.map(item -> new NavigationHospitalRecommendation(
item.getHospitalId(),
item.getHospitalName(),
item.getLevel(),
item.getLongitude(),
item.getLatitude(),
item.getSpecialtyDepartments(),
item.getDeepLinkUrl(),
item.getPhone(),
safeDistance(item.getDistanceKm()),
resolveRankTag(item.getHospitalId(), distanceNearestId, resourceBestId)
))
.toList();
Result<List<NavigationHospitalRecommendation>> result = new Result<>();
result.setCode(Result.SUCCESS_CODE);
result.setData(recommendationList);
return result;
rankTag 的生成逻辑如下:
private String resolveRankTag(Long hospitalId, Long distanceNearestId, Long resourceBestId) {
boolean isDistanceNearest = hospitalId != null && hospitalId.equals(distanceNearestId);
boolean isResourceBest = hospitalId != null && hospitalId.equals(resourceBestId);
if (isDistanceNearest && isResourceBest) {
return "optimistic";
}
if (isDistanceNearest) {
return "distance";
}
if (isResourceBest) {
return "resource";
}
return "none";
}
2. 输出具体就医攻略
推荐医院只是导航的第一步。
当用户从推荐列表中选定一家医院之后,系统还要给出具体的就医攻略,包括为什么这家医院适合、如何进行网上挂号、是否需要电话咨询、距离大概如何影响出行安排等。这一部分对应的接口是 /sessions/{sessionId}/navigation/{hospitalId}。
这里首先要做的是用户身份校验、会话归属校验、目标医院合法性校验,然后再把相关上下文整理给 Python。
@PostMapping("/sessions/{sessionId}/navigation/{hospitalId}")
public Result<NavigationGuideResponse> getNavigationGuide(
@PathVariable("sessionId") String sessionId,
@PathVariable("hospitalId") Long hospitalId,
@RequestHeader(value = "Authorization", required = false) String authorization,
@RequestParam(value = "token", required = false) String tokenParam,
@RequestBody(required = false) NavigationGuideRequest request
) {
Result<UserInfoEntity> authResult = authenticate(authorization, tokenParam);
if (authResult.isFail()) {
return failure(authResult.getCode(), authResult.getMessage());
}
Optional<AgentSessionEntity> ownership = loadOwnedSession(sessionId, authResult.getData().getUserId());
if (ownership.isEmpty()) {
return failure("A0451", "session not found");
}
if (hospitalId == null || hospitalId <= 0) {
return failure("A0490", "hospitalId is required");
}
if (request == null || request.longitude() == null || request.latitude() == null) {
return failure("A0491", "longitude and latitude are required");
}
if (!isLongitudeValid(request.longitude()) || !isLatitudeValid(request.latitude())) {
return failure("A0492", "longitude or latitude is invalid");
}
Optional<HospitalRouteDictEntity> hospitalOptional = hospitalRouteDictRepository.findById(hospitalId);
if (hospitalOptional.isEmpty()) {
return failure("A0493", "hospital not found");
}
进入攻略生成前,Java 端先完成距离计算和医院数据打包。
这样 Python 不需要再回头查数据库,只专注于把这些信息组织成攻略文本。
HospitalRouteDictEntity hospital = hospitalOptional.get();
List<ChatMessageEntity> history =
chatMessageRepository.findBySessionIdOrderBySendTimeAscMessageIdAsc(
ownership.get().getSessionId()
);
Double distanceKm = computeDistanceKm(
request.latitude(),
request.longitude(),
hospital.getLatitude(),
hospital.getLongitude()
);
距离计算使用平方差公式:
private Double computeDistanceKm(
BigDecimal userLatitude,
BigDecimal userLongitude,
BigDecimal hospitalLatitude,
BigDecimal hospitalLongitude
) {
if (userLatitude == null || userLongitude == null || hospitalLatitude == null || hospitalLongitude == null) {
return null;
}
double lat1 = Math.toRadians(userLatitude.doubleValue());
double lon1 = Math.toRadians(userLongitude.doubleValue());
double lat2 = Math.toRadians(hospitalLatitude.doubleValue());
double lon2 = Math.toRadians(hospitalLongitude.doubleValue());
double dLat = lat2 - lat1;
double dLon = lon2 - lon1;
double sinLat = Math.sin(dLat / 2);
double sinLon = Math.sin(dLon / 2);
double a = sinLat * sinLat + Math.cos(lat1) * Math.cos(lat2) * sinLon * sinLon;
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
return 6371.0 * c;
}
医院信息则统一装成结构化 payload:
private Map<String, Object> buildHospitalPayload(HospitalRouteDictEntity hospital) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("hospital_id", hospital.getHospitalId());
payload.put("hospital_name", hospital.getHospitalName());
payload.put("level", hospital.getLevel());
payload.put("longitude", hospital.getLongitude());
payload.put("latitude", hospital.getLatitude());
payload.put("specialty_departments", hospital.getSpecialtyDepartments());
payload.put("deep_link_url", hospital.getDeepLinkUrl());
payload.put("phone", hospital.getPhone());
return payload;
}
Python 端新增了 /navigation/guide 方法。
它接收 Java 传来的医院基础信息、用户位置、距离信息以及会话历史,然后调用百川模型生成攻略。
class NavigationGuideRequest(BaseModel):
trace_id: str | None = None
session_id: str
user_id: int
hospital_id: int
user_longitude: float | None = None
user_latitude: float | None = None
distance_km: float | None = None
hospital: dict[str, Any] = Field(default_factory=dict)
history_messages: list[NavigationHistoryMessage] = Field(default_factory=list)
source: str | None = "springboot"
真正的调用逻辑如下:
async def navigation_guide_handler(request: NavigationGuideRequest) -> dict[str, Any]:
_validate_guide_request(request)
history_text = _build_history_text(request.history_messages)
system_prompt = (
"你是医疗导航助手。请结合病情摘要、目标医院信息、距离信息,输出可执行的中文就医攻略。"
"重点包括:1) 为什么这家医院满足就医条件;2) 如何线上挂号预约;3) 何时电话咨询及怎么问。"
"输出必须是单个JSON对象,字段:guide、rationale、actions。"
)
payload = {
"task": "navigation_guide",
"history_text": history_text,
"hospital": request.hospital,
"distance_km": request.distance_km,
"user_location": {
"longitude": request.user_longitude,
"latitude": request.user_latitude,
},
}
result = await asyncio.to_thread(
_RUNTIME.run_baichuan_prompt,
system_prompt,
json.dumps(payload, ensure_ascii=False),
True,
)
3. 推进会话阶段
导航阶段完成后,系统需要进入康复阶段。
这里我们没有把状态推进和推荐逻辑、攻略逻辑绑在一起,而是单独做成一个很轻的方法 /sessions/{sessionId}/navigation/state。这样接口职责更纯粹,前端在适当时机调用即可。
状态推进虽然逻辑简单,但它直接影响整个多阶段智能体的状态机,所以仍然必须先做鉴权和会话归属校验。
@PutMapping("/sessions/{sessionId}/navigation/state")
public Result<SessionResponse> advanceNavigationState(
@PathVariable("sessionId") String sessionId,
@RequestHeader(value = "Authorization", required = false) String authorization,
@RequestParam(value = "token", required = false) String tokenParam
) {
Result<UserInfoEntity> authResult = authenticate(authorization, tokenParam);
if (authResult.isFail()) {
return failure(authResult.getCode(), authResult.getMessage());
}
Optional<AgentSessionEntity> ownership = loadOwnedSession(sessionId, authResult.getData().getUserId());
if (ownership.isEmpty()) {
return failure("A0451", "session not found");
}
在状态机设计里,我们不希望前端随便把会话从任意阶段改成任意阶段。
所以这里显式限制,当前阶段必须是 NAVIGATING,否则直接拒绝。
AgentSessionEntity session = ownership.get();
String currentStage = trimToNull(session.getCurrentStage());
currentStage = StringUtils.hasText(currentStage)
? currentStage.toUpperCase(Locale.ROOT)
: STAGE_CONSULTING;
if (!STAGE_NAVIGATING.equals(currentStage)) {
return failure("A0495", "current stage must be NAVIGATING");
}
更多推荐

所有评论(0)