
分布式调度网络策略调整
但是实际公司内部,因为各种原因有的项目是云上容器部署,有的是云上虚机部署,如果使用Powerjob来作为公司统一任务调度中心,,比如Powerjob Server云上k8s部署,应用非k8s部署,就会出现PowerJob Server 和应用服务网络无法打通问题。那么可以改变直接访问容器内server ip地址的方式,转而访问容器宿主的ip及容器暴露出来的端口,再通过宿主间接访问容器内的serve
理论部分
Powerjob支持容器部署和非容器部署,但是有个很不灵活的限制就是:要么Powerjob Server 和应用端都是容器部署(k8s),要么都是非容器部署。
但是实际公司内部,因为各种原因有的项目是云上容器部署,有的是云上虚机部署,如果使用Powerjob来作为公司统一任务调度中心,,比如Powerjob Server云上k8s部署,应用非k8s部署,就会出现PowerJob Server 和应用服务网络无法打通问题。
以下源码修改,支持在云上部署server,并支持云上或非云部署woker。
应用服务网络无法打通,归根到底是因为非容器部署的woker无法访问容器内server的ip地址。那么可以改变直接访问容器内server ip地址的方式,转而访问容器宿主的ip及容器暴露出来的端口,再通过宿主间接访问容器内的server服务。
源码调整部分 ServerController 类
修改acquireServer()方法
修改前
@GetMapping("/acquire")
public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
return ResultDTO.success(serverElectionService.elect(request));
}
修改后
@GetMapping("/acquire")
public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
String elect = serverElectionService.elect(request);
if (request.getWorkerDockerDeploy() != null && !request.getWorkerDockerDeploy()) {
String realElect = null;
if (getCurrentServer().equals(elect)) {
realElect = getRealServer();
} else {
String reqUrl = buildGetNodeTransportAddrUrl(elect);
try {
String result = HttpUtils.get(reqUrl);
ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
realElect = resultDTO.getData().toString();
}
} catch (IOException e) {
ResultDTO.failed("find node ip error: " + e.getMessage());
}
}
return ResultDTO.success(realElect);
} else {
return ResultDTO.success(elect);
}
}
增加 getRealServer(),getCurrentServer(),buildGetNodeTransportAddrUrl()方法
private String getRealServer() {
return extendConfig.getServerNodeTransportAddress();
}
private String getCurrentServer() {
return serverInfo.getIp() + ":" + environment.getProperty("oms.http.port");
}
private String buildGetNodeTransportAddrUrl(String address) {
String[] arr = address.split(":");
return "http://" + arr[0] + ":" + environment.getProperty("server.port") + "/server/node/transport/address";
}
增加变量
private final Environment environment;
private final ExtendConfig extendConfig;
创建ExtendConfig类
在tech.powerjob.server.config路径下创建ExtendConfig
@Configuration(proxyBeanMethods = false)
@Getter
@Setter
public class ExtendConfig {
@Value("${serverNodeTransportAddress}")
private String serverNodeTransportAddress;
}
serverNodeTransportAddress值配置
serverNodeTransportAddress的值可以采用2种配置方式
其一,在k8s的yaml的文件中配置
spec:
containers:
- env:
- name: serverNodeTransportAddress
## 32744 是10010端口暴露出来的节点端口
value: '-DserverNodeTransportAddress=宿主ip:32744'
其二,在server的yml 或者 properties 配置文件中直接赋值
serverNodeTransportAddress:宿主ip:32744
#或者
serverNodeTransportAddress=宿主ip:32744
修改ServerDiscoveryRequest类
增加 workerDockerDeploy 属性 及 get方法
private Boolean workerDockerDeploy;
public Boolean getWorkerDockerDeploy() {
return workerDockerDeploy;
}
修改toMap()方法
修改前
public Map<String, Object> toMap() {
Map<String, Object> ret = new HashMap<>();
ret.put("appId", appId);
ret.put("protocol", protocol);
if (StringUtils.isNotEmpty(currentServer)) {
ret.put("currentServer", currentServer);
}
if (StringUtils.isNotEmpty(clientVersion)) {
ret.put("clientVersion", clientVersion);
}
return ret;
}
修改后
public Map<String, Object> toMap() {
Map<String, Object> ret = new HashMap<>();
ret.put("appId", appId);
ret.put("protocol", protocol);
if (StringUtils.isNotEmpty(currentServer)) {
ret.put("currentServer", currentServer);
}
if (StringUtils.isNotEmpty(clientVersion)) {
ret.put("clientVersion", clientVersion);
}
if (workerDockerDeploy != null) {
ret.put("workerDockerDeploy", workerDockerDeploy);
}
return ret;
}
修改ServerDiscoveryService类
修改buildServerDiscoveryUrl()方法
修改前
private String buildServerDiscoveryUrl(String address) {
ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest()
.setAppId(appId)
.setCurrentServer(currentServerAddress)
.setProtocol(config.getProtocol().name().toUpperCase());
String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap());
return String.format(DISCOVERY_URL, address, query);
}
修改后
private String buildServerDiscoveryUrl(String address) {
ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest()
.setAppId(appId)
.setCurrentServer(currentServerAddress)
//新增的代码
.setWorkerDockerDeploy(config.getDockerDeploy())
.setProtocol(config.getProtocol().name().toUpperCase());
String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap());
return String.format(DISCOVERY_URL, address, query);
}
修改PowerJobWorkerConfig类
增加dockerDeploys属性
private Boolean dockerDeploy = false;
修改PowerJobAutoConfiguration类
修改initPowerJob 方法
//增加如下代码
config.setDockerDeploy(worker.getDockerDeploy());
修改Worker类
//增加如下属性
private Boolean dockerDeploy = false;
修改配置woker文件
修改前
powerjob.worker.protocol=akka
修改后
powerjob.worker.protocol=http
更多推荐
所有评论(0)