理论部分

Powerjob支持容器部署和非容器部署,但是有个很不灵活的限制就是:要么Powerjob Server 和应用端都是容器部署(k8s),要么都是非容器部署。

但是实际公司内部,因为各种原因有的项目是云上容器部署,有的是云上虚机部署,如果使用Powerjob来作为公司统一任务调度中心,,比如Powerjob Server云上k8s部署,应用非k8s部署,就会出现PowerJob Server 和应用服务网络无法打通问题。

以下源码修改,支持在云上部署server,并支持云上或非云部署woker。

应用服务网络无法打通,归根到底是因为非容器部署的woker无法访问容器内server的ip地址。那么可以改变直接访问容器内server ip地址的方式,转而访问容器宿主的ip及容器暴露出来的端口,再通过宿主间接访问容器内的server服务。

源码调整部分 ServerController 类

修改acquireServer()方法

修改前

@GetMapping("/acquire")
public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
    return ResultDTO.success(serverElectionService.elect(request));
}

修改后

@GetMapping("/acquire")
public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
    String elect = serverElectionService.elect(request);
    if (request.getWorkerDockerDeploy() != null && !request.getWorkerDockerDeploy()) {
        String realElect = null;
        if (getCurrentServer().equals(elect)) { 
            realElect = getRealServer(); 
        } else {
            String reqUrl = buildGetNodeTransportAddrUrl(elect); 
            try {
                String result = HttpUtils.get(reqUrl);
                ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
                if (resultDTO.isSuccess()) {
                    realElect = resultDTO.getData().toString();
                }
            } catch (IOException e) {
                ResultDTO.failed("find node ip error: " + e.getMessage());
            }
        }
        return ResultDTO.success(realElect);
    } else {
        return ResultDTO.success(elect);
    }
}

增加 getRealServer(),getCurrentServer(),buildGetNodeTransportAddrUrl()方法

private String getRealServer() {
        return extendConfig.getServerNodeTransportAddress();
    }
​
 private String getCurrentServer() {
        return serverInfo.getIp() + ":" + environment.getProperty("oms.http.port");
    }
​
private String buildGetNodeTransportAddrUrl(String address) {
        String[] arr = address.split(":");
         
        return "http://" + arr[0] + ":" + environment.getProperty("server.port") + "/server/node/transport/address";
    }
 

增加变量

private final Environment environment;
​
private final ExtendConfig extendConfig;

创建ExtendConfig类

在tech.powerjob.server.config路径下创建ExtendConfig

@Configuration(proxyBeanMethods = false)
@Getter
@Setter
public class ExtendConfig {
    @Value("${serverNodeTransportAddress}")
    private String serverNodeTransportAddress;
}
serverNodeTransportAddress值配置

serverNodeTransportAddress的值可以采用2种配置方式

其一,在k8s的yaml的文件中配置

spec:
  containers:
    - env:
      - name: serverNodeTransportAddress
        ## 32744 是10010端口暴露出来的节点端口
        value: '-DserverNodeTransportAddress=宿主ip:32744'

其二,在server的yml 或者 properties 配置文件中直接赋值

serverNodeTransportAddress:宿主ip:32744
#或者
serverNodeTransportAddress=宿主ip:32744

修改ServerDiscoveryRequest类

增加 workerDockerDeploy 属性 及 get方法

private Boolean workerDockerDeploy;
​
public Boolean getWorkerDockerDeploy() {
        return workerDockerDeploy;
    }

修改toMap()方法

修改前

public Map<String, Object> toMap() {
        Map<String, Object> ret = new HashMap<>();
        ret.put("appId", appId);
        ret.put("protocol", protocol);
        if (StringUtils.isNotEmpty(currentServer)) {
            ret.put("currentServer", currentServer);
        }
        if (StringUtils.isNotEmpty(clientVersion)) {
            ret.put("clientVersion", clientVersion);
        }
        return ret;
    }

修改后

public Map<String, Object> toMap() {
        Map<String, Object> ret = new HashMap<>();
        ret.put("appId", appId);
        ret.put("protocol", protocol);
        if (StringUtils.isNotEmpty(currentServer)) {
            ret.put("currentServer", currentServer);
        }
        if (StringUtils.isNotEmpty(clientVersion)) {
            ret.put("clientVersion", clientVersion);
        }
        if (workerDockerDeploy != null) {
            ret.put("workerDockerDeploy", workerDockerDeploy);
        }
        return ret;
    }

修改ServerDiscoveryService类

修改buildServerDiscoveryUrl()方法

修改前

private String buildServerDiscoveryUrl(String address) {
​
        ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest()
                .setAppId(appId)
                .setCurrentServer(currentServerAddress)
                .setProtocol(config.getProtocol().name().toUpperCase());
​
        String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap());
        return String.format(DISCOVERY_URL, address, query);
    }

修改后

private String buildServerDiscoveryUrl(String address) {
​
        ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest()
                .setAppId(appId)
                .setCurrentServer(currentServerAddress)
                //新增的代码
                .setWorkerDockerDeploy(config.getDockerDeploy())
                .setProtocol(config.getProtocol().name().toUpperCase());
​
        String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap());
       
        return String.format(DISCOVERY_URL, address, query);
    }

修改PowerJobWorkerConfig类

增加dockerDeploys属性

private Boolean dockerDeploy = false;

修改PowerJobAutoConfiguration类

修改initPowerJob 方法

//增加如下代码
config.setDockerDeploy(worker.getDockerDeploy());

修改Worker类

  //增加如下属性
  private Boolean dockerDeploy = false;

修改配置woker文件

修改前

powerjob.worker.protocol=akka

修改后

powerjob.worker.protocol=http

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐