场景

现在有这样一个场景:用户会在平台上创建任务跑算法训练,启动任务后,后台会通过 kubernetes 来创建一个容器跑训练任务。需要在页面上能实时展示训练任务的日志。由于日志是不断在产生的,而页面上也需要实时自动刷新最新日志,所以需要使用到 websocket。

以下面这个 pod 为例:

如图所示,目的是要实时获取这个pod的日志并在页面上展示,且能自动刷新日志。

jobName (kubernetes job 的完整名称):train-new-test

podName (job 生成的 pod 的完整名称):train-new-test-zsn9h

fabric8 kubernetes-client

kubernetes 的 java 客户端,通过它来连接 kubernetes 集群,获取资源。

<dependency>
	<groupId>io.fabric8</groupId>
	<artifactId>kubernetes-client</artifactId>
	<version>5.10.1</version>
</dependency>
<dependency>
	<groupId>io.fabric8</groupId>
	<artifactId>kubernetes-model</artifactId>
	<version>5.10.1</version>
</dependency>

kubernetes-client 的基本使用方法就不说了,总之就是要在后台启动的时候,初始化好客户端对象(DefaultKubernetesClient),后面任何要连接 kubernetes 集群的操作,都要通过这个 client 对象来操作。类似这种创建方式:

private DefaultKubernetesClient getClient() {
	Config config = new ConfigBuilder().withMasterUrl("https://xxxxxxxxxxxxxx")
			.withCaCertFile("xxxxxx")
			.withClientCertFile("xxxxxx")
			.withClientKeyFile("xxxxxx")
			.build();
	return new DefaultKubernetesClient(config);
}

结合kubernetes客户端,编写根据job名称获取容器日志的方法。

@Component
public class K8sClientOperator {

    // 这个K8sClientHolder自行实现吧,主要就是持有一个DefaultKubernetesClient对象,这里面涉及kubernetes证书文件之类的
    @Autowired
    private K8sClientHolder k8sClientHolder;

    // 将日志读到流里,给到websocket
    public void watchLog(String jobName, String namespace, OutputStream outputStream) throws IOException {
        Job job = k8sClientHolder.getClient().batch().v1().jobs().inNamespace(namespace).withName(jobName).get();
        JobStatus jobStatus = job != null ? job.getStatus() : null;
        if (jobStatus == null) {
            outputStream.write("Job不存在".getBytes());
            return;
        }
        Pod pod = getPodByName(jobName, namespace);
        if (pod == null) {
            outputStream.write("Pod不存在".getBytes());
            return;
        }
        String podName = pod.getMetadata().getName(); // pod完整名称

        // 判断job是否完成,完成读取500行日志
        if (jobStatus.getSucceeded() != null && jobStatus.getSucceeded() == 1) {
            String podLogs = k8sClientHolder.getClient().pods().inNamespace(namespace).withName(podName)
                    .tailingLines(500).getLog();
            outputStream.write(podLogs.getBytes());
            return;
        }

        // job没完成,则获取相应pod日志
        String phase = pod.getStatus().getPhase(); // pod获取当前所处的阶段
        switch (phase) {
            case "Running" : // 运行状态,最理想情况
                k8sClientHolder.getClient().pods().inNamespace(namespace).withName(podName)
                        .tailingLines(100).watchLog(outputStream);
                break;
            case "ContainerCreating" :  // 容器正在创建,比如可能正在拉取镜像
                outputStream.write("容器创建中".getBytes());
                break;
            case "Completed" : // 容器已经运行结束了,直接拉取最后500条记录
                k8sClientHolder.getClient().pods().inNamespace(namespace).withName(podName)
                        .tailingLines(500).watchLog(outputStream);
                break;
            // todo 还有一些别的状态,根据实际情况处理,比如:Pending
            default :
                k8sClientHolder.getClient().pods().inNamespace(namespace).withName(podName)
                        .tailingLines(100).watchLog(outputStream);
                break;
        }
    }

    // 根据job名称找到对应的Pod。
    // 这里有其他的方式,就是写一条linux命令,
    // 直接根据jobName找到pod的完整名字,然后通过java执行linux命令就行了。
    // 但是通过fabric8的操作,我找不到合适的方式,只能获取所有Pod,然后根据名字来比对。
    public Pod getPodByName(String jobName, String namespace){
        Pod realPod = null;
        // pod列表
        List<Pod> podList = k8sClientHolder.getClient().pods().inNamespace(namespace).list().getItems();
        String podName;
        for (Pod pod : podList) {
            String phase = pod.getStatus().getPhase();
            // 排除一些不正常的pod
            if (StringUtils.isBlank(phase)
                    || "Terminating".equals(phase)
                    || ("Failed".equals(phase) 
                    && "UnexpectedAdmissionError".equals(pod.getStatus().getReason()))) {
                continue;
            }
            // 由于是kubernetes的job创建的pod,所以pod名会在job名称后面自动加上一串字母,用"-"连接,所以比较时需要去掉后面一串
            podName = pod.getMetadata().getName().substring(0, pod.getMetadata().getName().lastIndexOf("-"));
            if (jobName.equals(podName)) {
                realPod = pod;
                break;
            }
        }
        return realPod;
    }

}

java 后端 websocket

编写 java 后端通过 websocket 实时获取 pod 日志并返回给前端的代码

自定义 OutputStream

自定义一个 OutputStream,用于将日志内容写入 websocket。

需要重写 write(byte b[], int off, int len) 方法,因为 kubernetes-client 的 watchLog 方法中,调用的是这个方法。如果是老版本的 kubernetes-client,调用的是 write(byte b[]) 方法,这种情况就需要重写 write(byte b[]) 方法。(我在升级 kubernetes-client 版本时就遇到了这个问题)

public class WebTermOutputStream extends OutputStream {
    private WebSocketSession webSocketSession;

    public WebTermOutputStream(WebSocketSession webSocketSession) {
        this.webSocketSession = webSocketSession;
    }

    @Override
    public void write(byte b[], int off, int len) throws IOException {
        if (b == null || b.length == 0 || len == 0 || ((off < 0) || (off > b.length) || (len < 0) ||
                ((off + len) > b.length) || ((off + len) < 0))) {
            return;
        }
        String response = new String(b, off, len, StandardCharsets.UTF_8);
        webSocketSession.sendMessage(new TextMessage(response));
    }
}

websocket 处理器

自定义的 WebSocketHandler,需要继承 TextWebSocketHandler,并重写几个主要方法

@Component
@Slf4j
public class MyWebSocketHandler extends TextWebSocketHandler {

    @Autowired
    private K8sClientOperator k8sClientOperator;

    private Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();

    // 连接建立
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        log.info("afterConnectionEstablished, connect success");
    }

    // 接收 -> 处理 -> 发送
    @Override
    protected void handleTextMessage(WebSocketSession webSocketSession, TextMessage message) throws Exception {
        /*JSONObject handleMessage = JSON.parseObject(message.getPayload());
        String taskName = String.valueOf(handleMessage.get("taskName"));
        if (StringUtils.isBlank(taskName)) {
            return;
        }
        String jobName = "train-" + taskName;*/
        String jobName = "train-new-test"; // 这里图省事,就把jobName写死了
        k8sClientOperator.watchLog(jobName, "training", new WebTermOutputStream(webSocketSession));
        sessionMap.put(webSocketSession.getId(), webSocketSession);
    }

    // 连接异常
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
        if (session.isOpen()) {
            log.info("session [" + session.getId() + "] is open, need close");
            try {
                session.close();
            } catch (IOException e) {
                log.error("session [" + session.getId() + "] close failed", e);
            }
        }
        sessionMap.remove(session.getId());
        log.error(String.format("websocket error, seesion[id:%s], \t %s",
                session.getId(), JSONObject.toJSONString(session.getAttributes())), exception);
    }

    // 连接关闭
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        log.warn(String.format("websocket closed, seesion[id:%s], \t %s",
                session.getId(), JSONObject.toJSONString(session.getAttributes())), status);
        sessionMap.remove(session.getId());
    }
}

后端 websocket 入口

接收 "/webterm" 请求

@Configuration
@EnableWebSocket
public class WebTermSocketConfig implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
	    // 接收 "/webterm" 请求
        registry.addHandler(webSocketHandler(), "/webterm").setAllowedOrigins("*");
    }

    @Bean
    public WebTermSocketHandler webSocketHandler() {
        return new MyWebSocketHandler();
    }
}

vue 前端 websocket

前端发起 websocket 连接请求,并不断接收后端返回的数据,展示在页面上。

这里我加了一个 logCache 对象,用于暂时保存 websocket 返回的数据。主要是因为跑的任务比较特殊,日志产生速度非常快,如果直接刷新在页面上,浏览器就奔溃了。所以先做缓存,然后创建一个定时器,每隔三秒将数据刷新到页面上。

只贴出一些关键的代码:

data () {
	return {
		log: '', // 日志内容
		// 日志缓存,主要是因为我的这种任务日志刷新实在太快
		// 如果实时刷新的话,页面直接崩溃了!
		// 所以加了个缓存,从websocket中获取的数据,先放入logCache中
		// 然后开启定时器,每隔3秒将logCache的数据加入页面展示的内容中,再清空logCache!
		// 如果日志刷新不算太快的,就不用这么麻烦了
		logCache: '', 
		websocket: null,
		socketTimer: null,
	}
},
mounted () {
	this.createWebsocket()
},
methods: {
  createWebsocket () {
		const url = 'wss://xxx.com:port/webterm'
		this.websocket = new WebSocket(url)
		this.websocket.onopen = () => {
			console.log("websocket.onopen...")
			this.websocket.send(JSON.stringify({ taskName: 'xxxxxx' }))
			this.openSocketTimer()
		}
		this.websocket.onmessage = (res) => {
			console.log("websocket.onmessage...")
			const data = res.data
			let lineRes = ''
			let arrN = []
			if (data.indexOf('\n') !== -1) {
				arrN = data.split('\n')
			}

			if (arrN.length > 0) {
				for (let i = 0; i < arrN.length - 1; i++) {
					let lineStr = arrN[i]
					if (lineStr.indexOf('\r') !== -1) {
						lineStr = lineStr[lineStr.length - 1]
					}

					lineRes += lineStr + '\n\n'
				}
			} else {
				lineRes = data
			}
			// 放到缓存logCache中
			if (res.data) {
				if(!this.log || this.log.length == 0) {
					this.$nextTick(() => {
						this.log = `${this.log}${lineRes}`
					})
				} else {
					this.logCache = `${this.logCache}${lineRes}`
				}
			}
		}
		this.websocket.onclose = function (e) {
			console.log("websocket.onclose...")
			console.log(`连接已断开...>>>${e.code}`)
		}
	},
	// 开启定时器,每隔3秒检验下是否需要刷新日志
	openSocketTimer () {
		this.clearSocketTimer()
		this.socketTimer = setInterval(this.checkRenderEnable, 3000)
	},
	// 关闭定时器
	clearSocketTimer () {
		if (this.socketTimer != null) {
			clearInterval(this.socketTimer)
			this.socketTimer = null
		}
	},
	// 判断是否需要刷新页面,也就是log缓存是否为空
	checkRenderEnable () {
		if(this.logCache && this.logCache.length > 0) {
			this.renderPage()
		}
	},
	// 刷新页面
	renderPage () {
		this.$nextTick(() => {
			this.log = `${this.log}${this.logCache}`
			this.logCache = '' // 刷新完成之后,清空log缓存
		})
	},
}
Logo

开源、云原生的融合云平台

更多推荐