vertx构建高性能策略网关的解决方案
通过各种方式组建集群(框架本来支持多种方式,也可以自定义集群组成方式,例如ETCD、k8s的apiserver等,扩展集群类,重写心跳检测等),并将集群管理器和节点元数据等保存全局上下文。剩下的,我们通过反射机制在服务启动时,自动注册controller层的路由,vertical部署时,自动发布集群间调用的服务,然后通过上下文,手动写一下,管理好我们的bean就行了。通过以上方案,集群间强一致性,
1. 构建vertx集群,实现集群间节点通信的强一致性
通过前端建立虚拟主机、路由和其他配置等,后端负载至集群一个节点,需要进行节点间强一致性的通信机制,返回前端响应信息。解决方案如下:
通过各种方式组建集群(框架本来支持多种方式,也可以自定义集群组成方式,例如ETCD、k8s的apiserver等,扩展集群类,重写心跳检测等),并将集群管理器和节点元数据等保存全局上下文
Vertx.clusteredVertx(options, re -> { if (re.succeeded()) { Vertx vertx = re.result(); System.out.println(mgr.getNodeId(
Vertx.clusteredVertx(options, re -> {
if (re.succeeded()) {
Vertx vertx = re.result();
System.out.println(mgr.getNodeId()+"======本机nodeid---"+eventBusOptions.getClusterNodeMetadata());
}
}
启动vertical,通过事件总线注册服务时,添加本节点ID作为标识,例如:
//判断是否熔断
GlobalContext.context.vertx().eventBus().consumer(GlobalContext.context.getNodeId()+Constants.isFuseApi,this::isFuseApi );
编写ClusterManager工具类,通过集群管理器获取所有存活的节点,并提供requestAllNode,publishAllNode(可以进行集群通信但不能保证集群各个节点强一致性)等,并规范输入输出等一系列规则,提供给应用层调用:
public static Future<JsonObject> requestAllNodes(String method, Object msg){
Promise<JsonObject> promise = Promise.promise();
List<String> nodes = GlobalContext.context.getMgr().getNodes();
ArrayList<Future> futrues = new ArrayList<Future>();
for(String node:nodes){
Future future = GlobalContext.context.vertx().eventBus().request(node+method,msg);
future.onFailure(res->{
promise.complete(new JsonObject().put("code","11").put("message",node+"出错了"));
});
futrues.add(future);
}
CompositeFuture
.all(futrues).onSuccess(ar -> {
CompositeFuture f = ar.result();
if(f!=null){
boolean success = true;
List<ClusteredMessage> results=f.list();
for(ClusteredMessage relay:results){
if(!"00".equals(((JsonObject)relay.body()).getString("code")) ){
success = false;
}
}
if(success){
promise.complete(new JsonObject().put("code","00").put("message","所有节点服务操作成功"));
}
}
})
.onFailure(throwable -> {
LOG.error("节点同步过程中发生异常=========="+throwable.getCause());
promise.complete(new JsonObject().put("code","11").put("message","节点同步过程中发生异常"+throwable));
});
return promise.future();
}
通过以上方案,集群间强一致性,且集群故障时在客户端节点和其他节点上打印的日志都会十分清晰明确,方便故障排查。
2. 使用vertx框架重构应用,支持MVC开发模型,降低开发人员门槛。
controller层根据参数分两种服务:
一种是Router参数,用于外部请求调用,例如如下代码是获取虚拟主机的配置信息:
private void getApp(Router router){
router.post(prefix+"/getApp/:appId")
.handler(rc -> {
appService.getApp( new BigDecimal(rc.pathParam("appId")))
.onSuccess(res ->{
rc.response().end(res.toString());
})
.onFailure( throwable->{
rc.fail(throwable);
});
})
.failureHandler(DefaultExceptionHandler.getInstance());
}
另外一种是Message参数,用于集群内通讯,例如如下代码是集群内启动虚拟主机(虚拟主机启动后需要存储verticalId,用于后续停止虚拟主机):
/**
* 启动应用--用于集群间调用
* @param msg
*/
public void startApp(Message<JsonObject> msg) {
JsonObject app = msg.body();
DeploymentOptions options = new DeploymentOptions().setConfig(app);
String verticle = GlobalContext.context.getStartAppVerticleIdMap().get(app.getString("ID"));
if(verticle!=null){
msg.reply(new JsonObject().put("code","00").put("ip",GlobalContext.context.getIp()).put("message","verticle :"+verticle+" ip:"+GlobalContext.context.getIp()+"已是启动状态"));
return;
}
if ("SL00".equals(app.getString("TENANT"))) {
GlobalContext.context.vertx().deployVerticle(new SL00ServerVerticle(), options).onSuccess(res->{GlobalContext.context.getStartAppVerticleIdMap().put(app.getString("ID"),res);
msg.reply(new JsonObject().put("code","00").put("ip",GlobalContext.context.getIp()).put("message","verticle :"+res+" ip:"+GlobalContext.context.getIp()+"启动成功"));
}).onFailure(throwable -> {
msg.reply(new JsonObject().put("code","11").put("ip",GlobalContext.context.getIp()).put("message"," ip:"+GlobalContext.context.getIp()+"启动失败"+throwable.getCause()));
});
} else {
GlobalContext.context.vertx().deployVerticle(new StandardServerVerticle(), options).onSuccess(res->{
msg.reply(new JsonObject().put("code","00").put("ip",GlobalContext.context.getIp()).put("message","verticle :"+res+" ip:"+GlobalContext.context.getIp()+"启动成功"));
GlobalContext.context.getStartAppVerticleIdMap().put(app.getString("ID"),res);
}).onFailure(throwable -> {
msg.reply(new JsonObject().put("code","11").put("ip",GlobalContext.context.getIp()).put("message"," ip:"+GlobalContext.context.getIp()+"启动失败"+throwable.getCause()));
});
}
}
service和mapper层就更简单了,对开发人员要求是具备响应式编程的能力就可以了,例如如下是查询虚拟主机列表:
public Future<JsonArray> queryAppList(JsonObject query){
Promise<JsonArray> promise = Promise.promise();
StringBuffer sql = new StringBuffer("SELECT ID,NAME,TENANT,CONTENT FROM DB2ADMIN.GW_APP WHERE 1=1 ");
BigDecimal appId = null;
String appName = null;
String tenant = null;
ArrayList<Object> tupleList = new ArrayList<Object>();
if(query != null && query.size()>0){
appId = BigDecimal.valueOf(Long.parseLong(query.getString("appId"))) ;
appName = query.getString("appName");
tenant = query.getString("tenant");
}
if(appId!=null ){
sql.append(" AND ID=? ");
tupleList.add(appId);
}
if(!StringUtil.isNullOrEmpty(appName) ){
sql.append(" AND NAME = ? ");
tupleList.add(appName);
}
if(!StringUtil.isNullOrEmpty(tenant) ){
sql.append(" AND tenant = ? ");
tupleList.add(tenant);
}
Pool client = GlobalContext.context.jdbcPool();
client
.preparedQuery(sql.toString())
.execute(Tuple.tuple(tupleList))
.onFailure(throwable -> {
promise.fail(GWException.create(ErrorCode.DATABASEERROR,"查询应用列表",throwable));
})
.flatMap(rs ->{
if(rs != null && rs.iterator().hasNext()){
JsonArray jsonArray = new JsonArray();
rs.forEach(e ->{
jsonArray.add(e.toJson());
});
promise.complete(jsonArray);
}else{
promise.complete(new JsonArray());
}
return promise.future();
});
return promise.future();
}
剩下的,我们通过反射机制在服务启动时,自动注册controller层的路由,vertical部署时,自动发布集群间调用的服务,然后通过上下文,手动写一下,管理好我们的bean就行了。
未完待续,以下内容有空时再总结补充!!!
3. 最大限度的降低普通租户对重要业务的影响。
4. 全局异常处理。
5. 支持运行时嵌入个性化处理器。
更多推荐
所有评论(0)