10.png

前言

大家好,今天我们将基于XXL-Job,探讨任务调度迁移到云端的相关话题。

XXL-Job是一款功能强大、易用可靠的国产分布式任务调度平台,是目前国内使用比较广泛的分布式任务调度平台之一。它的主要特点包括:

  • 支持分布式、多线程任务调度;
  • 具有完整的管理后台,可以实现任务调度的创建、修改、启动和监控;
  • 提供了丰富的调度方式,包括cron表达式、API调用、消息队列等;
  • 支持任务执行过程的日志记录和错误处理,可以帮助用户快速定位问题。

随着云计算的全面普及和发展,越来越多企业开始认识到公共云平台的无限潜力。许多企业开始将自己的应用程序和业务迁移到云环境中,以获取更高的灵活性、弹性和可扩展性。然而,任务调度作为企业中的一个重要业务组件,对于软件开发和运营的质量都有着极大的影响。在云环境下部署和运行任务调度组件,需要考虑诸多因素,如安全性、可靠性、性能等。因此,企业需要认真思考如何在云平台上部署和运行任务调度组件,以保证运营效率、降低成本、提高应用程序的质量和性能。

云端迁移过程

由于历史原因,我们的 xxl-job-admin 端是部署在 k8s 集群外部的。在我们的项目中,我们是使用XML文件来集成xxl-job的,相关的集成配置如下所示:

<bean id="xxlJobExecutor" class="com.xxl.job.core.executor.impl.XxlJobSpringExecutor">
    <property name="adminAddresses" value="${xxl.job.admin.addresses}"/>
    <property name="appname" value="${xxl.job.executor.appname}"/>
    <property name="ip" value="${xxl.job.executor.ip}"/>
    <property name="port" value="${xxl.job.executor.port}"/>
    <property name="accessToken" value="${xxl.job.accessToken}" />
    <property name="logPath" value="${xxl.job.executor.logpath}"/>
    <property name="logRetentionDays" value="${xxl.job.executor.logretentiondays}"/>
</bean>

其中,相关配置值如下:

xxl.job.admin.addresses = http://127.0.0.1/xxl-job-admin
xxl.job.executor.appname = xxl-job-executor-sample
xxl.job.executor.ip = 
xxl.job.executor.port = 30065
xxl.job.accessToken = mytoken
xxl.job.executor.logpath = /etc/logs
xxl.job.executor.logretentiondays = -1

解决注册IP错误问题

当我们使用了与其他普通 Spring 项目的 JAR 包相同的部署方式将任务调度组件部署到了 k8s 上后,虽然我们通过管理页面看到已经成功将服务注册到了 xxl-job-admin,但我们发现该服务的 IP 地址为 k8s 中 Pod 的私有 IP 地址。因为k8s 集群内部通信的私有 IP 地址在集群外不可访问,这导致了任务无法正常执行,系统提示 IP 地址无效。

15.png

那么该如果解决这个问题呢?

阅读XXL-Job源码可以深入了解XXL-Job框架的实现细节和内部机制。在XXL-Job源码中,可以找到一些关键方法,帮助我们了解IP和port的获取规则。

具体来说,这些方法位于com.xxl.job.core.executor.XxlJobExecuto类中的initEmbedServer方法。当执行器启动时,会优先使用配置文件中的IP和端口,如果配置文件未指定,则通过NetUtils获取本地主机地址和默认端口。在注册成功后,执行器就可以通过该IP和端口与注册中心进行正常通信。部分源码如下:

port = port > 0 ? port : NetUtil.findAvailablePort(9999);
ip = ip != null && ip.trim().length() > 0 ? ip : IpUtil.getIp();

由此可见,为了解决这个问题,我们有两种方法可以尝试。

  • 我们可以直接将配置文件中的 xxl.job.executor.ip 指定为正确的IP地址,这样XXL-Job就可以正确地找到执行器并与之通信了。
  • 在XXL-Job的管理页面上将执行器的注册方式改为手动录入,并直接填写正确的IP地址。

16.png

无论使用哪种方法,唯一的要求就是确保与执行器实际运行的IP地址匹配。这样就可以使XXL-Job正常工作了。

实现动态注册IP

无论采用前面提到的两种方式中的哪一种,均存在一个xxl-job配置写死IP地址的问题,而无法实现IP的动态获取,这对于后期的维护和动态扩缩容都是不利的。那么如何在保证获取到的IP正确的前提下实现自动获取呢?

为了实现xxl-job自动获取注册IP的目的,在获取IP的过程中,我们可以结合Dubbo框架的获取IP逻辑,改造获取IP的顺序。按照以下顺序获取IP:

  • 首先根据环境变量获取IP,如果环境变量中存在,则获取环境变量中的IP地址。

  • 如果环境变量中不存在,则根据配置文件获取IP,如果配置文件中存在,则获取配置文件中的IP地址。

  • 如果配置文件中不存在,则获取本地IP地址。

这样的优先级顺序可以确保我们始终能够获得一个可用的注册IP。通过这种方式会让获取IP更加智能化和可靠。以下是具体改造步骤:

  • deploy.yaml 文件中添加环境变量。
spec:
  template:
    spec:
      containers:
        - env:
            - name: XXLJOB_IP_TO_REGISTRY
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.hostIP
  • 使用Java代码中的注释@Configuration和@Bean注释来替代使用XML文件进行Bean的注册和配置。
@Slf4j
@Configuration
public class XxlJobConfiguration {

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init start...");

        // 获取ip规则优先级, 环境变量(此值为deploy.yaml中配置)>配置文件>默认(本地)
        String ip = System.getenv("XXLJOB_IP_TO_REGISTRY");
        ip = StringUtils.isBlank(ip) ? PropertiesCacheUtil.getConfigValue("xxl.job.executor.ip") : ip;

        String port = PropertiesCacheUtil.getConfigValue("xxl.job.executor.port");
        String logRetentionDays = PropertiesCacheUtil.getConfigValue("xxl.job.executor.logretentiondays");

        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.admin.addresses")));
        xxlJobSpringExecutor.setAppname(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.executor.appname")));
        xxlJobSpringExecutor.setIp(ip);

        if (StringUtils.isNotBlank(port)) {
            xxlJobSpringExecutor.setPort(Integer.parseInt(port));
        }
        xxlJobSpringExecutor.setAccessToken(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.accessToken")));
        xxlJobSpringExecutor.setLogPath(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.executor.logpath")));
        if (StringUtils.isNotBlank(logRetentionDays)) {
            xxlJobSpringExecutor.setLogRetentionDays(Integer.parseInt(logRetentionDays));
        }
        
        log.info(">>>>>>>>>>> xxl-job config init end...");
        return xxlJobSpringExecutor;
    }
}

通过这样的改造,我们可以更加智能可靠地获取注册IP,实现了xxl-job自动获取IP地址的目的。

解决分片问题

无论使用上面提到的写死配置方式还是实现动态注册IP,都是仅适用于单机的情况,如果需要部署多台任务调度组件,那么又该如何配置才能保证每个服务都可以被调度,以达到实现分片处理的目的呢?

方法1:

我们可以通过在deploy.yaml文件中配置Pod的反亲和性,使得单台宿主机上仅能部署一个服务,并且配置在service.yaml中配置代理策略为Local的方式来达到上述目的。具体配置如下:

deploy.yaml改造如下:

spec:
  template:
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - topologyKey: kubernetes.io/hostname
              labelSelector:
                matchExpressions:
                  - key: app
                    operator: In
                    values:
                      - 你的APP名称

service.yaml 改造如下:

spec:
  ## 代理策略:默认Cluster。Cluster表示:流量可以转发到其他节点上的Pod。Local表示:流量只发给本机的Pod
  externalTrafficPolicy: Local

经过上面的改造,我们成功的解决了分片问题,但是又带来了新的问题,如下图所示:
在这里插入图片描述

上面的方法都是使用Deployment方式部署的,那么,我们是否可以换下思路使用StatefulSet方式部署呢?这就衍生出了下面的方法。

方法2:
  • 改造配置:
## 注册到xxljob的端口,多个使用英文逗号分隔
xxl.job.executor.port = 30065,30066,30067
  • 改造代码
@Slf4j
@Configuration
public class XxlJobConfiguration {

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");

        // 获取ip规则优先级, 配置中心>环境变量(此值为deploy.yml中配置)
        String ip = PropertiesCacheUtil.getConfigValue("xxl.job.executor.ip");
        ip = StringUtils.isBlank(ip) ? System.getenv("XXLJOB_IP_TO_REGISTRY") : ip;
        log.info("==>ip:{}", ip);

        String podName = StringUtils.trimToEmpty(System.getenv("POD_NAME"));
        log.info("==>POD_NAME:{}", podName);

        String[] split = StringUtils.split(podName, "-");
        String index = split[split.length - 1];
        log.info("==>index:{}", index);

        String allPort = PropertiesCacheUtil.getConfigValue("xxl.job.executor.port");
        String[] portSplit = StringUtils.split(allPort, ",");
        String port = portSplit[Integer.parseInt(index)];
        log.info("==>port:{}", port);

        String logRetentionDays = PropertiesCacheUtil.getConfigValue("xxl.job.executor.logretentiondays");

        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.admin.addresses")));
        xxlJobSpringExecutor.setAppname(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.executor.appname")));
        xxlJobSpringExecutor.setIp(ip);

        if (StringUtils.isNotBlank(port)) {
            xxlJobSpringExecutor.setPort(Integer.parseInt(port));
        }
        xxlJobSpringExecutor.setAccessToken(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.accessToken")));
        xxlJobSpringExecutor.setLogPath(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.executor.logpath")));
        if (StringUtils.isNotBlank(logRetentionDays)) {
            xxlJobSpringExecutor.setLogRetentionDays(Integer.parseInt(logRetentionDays));
        }
        return xxlJobSpringExecutor;
    }
}
@Slf4j
@Component
public class InitNotifyDataFromDBHandler {

    @XxlJob("initNotifyDataFromDBHandler")
    public void initNotifyDataFromDBHandler(String params) {
            // XxlJobHelper.getShardIndex():当前分片序号(从0开始),执行器集群列表中当前执行器的序号;
            // XxlJobHelper.getShardTotal():总分片数,执行器集群的总机器数量;

            String podName = StringUtils.trimToEmpty(System.getenv("POD_NAME"));
            log.info("==>POD_NAME:{}", podName);
            XxlJobHelper.log("==>POD_NAME:{}", podName);

            String[] split = StringUtils.split(podName, "-");
            String index = split[split.length - 1];
            log.info("==>index:{}", index);
            XxlJobHelper.log("==>index:{}", index);

            // 下标0:机器总数目,下标1:当前机器在总机器中的位置下标
            String[] args = {XxlJobHelper.getShardTotal() + "", index};
            
            // 其他业务逻辑
            }
     }
  • 重写K8S中yaml部署文件
## 创建StatefulSet
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: your-app
  namespace: your-namespace
spec:
  serviceName: your-app
  replicas: 3
  selector:
    matchLabels:
      app: your-app
  template:
    metadata:
      annotations:
        statefulset.kubernetes.io/pod-name: $(POD_NAME)
      labels:
        app: your-app
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: project.node
                    operator: In
                    values:
                      - your-project-node
      volumes:
        - name: timezone
          hostPath:
            path: /usr/share/zoneinfo/Asia/Shanghai
      containers:
        - env:
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: DUBBO_IP_TO_REGISTRY
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.hostIP
            - name: XXLJOB_IP_TO_REGISTRY
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.hostIP
          image: your-image
          imagePullPolicy: Always
          name: your-app
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      terminationGracePeriodSeconds: 30
## 创建service
---
apiVersion: v1
kind: Service
metadata:
  name: service-your-app-0
  namespace: your-namespace
spec:
  selector:
    statefulset.kubernetes.io/pod-name: your-app-0
  type: NodePort
  sessionAffinity: None
  ports:
    - name: xxljob-your-app
      port: 30065
      targetPort: 30065
      nodePort: 30065
---
apiVersion: v1
kind: Service
metadata:
  name: service-your-app-1
  namespace: your-namespace
spec:
  selector:
    statefulset.kubernetes.io/pod-name: your-app-1
  type: NodePort
  sessionAffinity: None
  ports:
    - name: xxljob-your-app
      port: 30066
      targetPort: 30066
      nodePort: 30066
---
apiVersion: v1
kind: Service
metadata:
  name: service-your-app-2
  namespace: your-namespace
spec:
  selector:
    statefulset.kubernetes.io/pod-name: your-app-2
  type: NodePort
  sessionAffinity: None
  ports:
    - name: xxljob-your-app
      port: 30067
      targetPort: 30067
      nodePort: 30067

经过上面的改造,我们成功的解决了使用第一种方法带来的问题。但是这个方法同样以下缺点,但是这种缺点相对来说是可以忽略的,因为生产环境不会随便增减副本数量。

  • 在K8S的dashboard页面直接新增副本数量无效,需要先新增配置文件中的端口,再新增部署yaml中对应的Service,才能真正实现副本数量的增加。

小结

以上就是今天分享的任务调度上云的相关内容,我们的目标不仅仅是将任务调度程序迁移到云端,更是要通过实现自动注册功能,使任务调度程序能自动加入云端调度集群,从而更方便地进行任务调度,提升运行效率和可扩展性。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐