基于注册中心的动态路由-2
Spring-Cloud-Gateway源码系列学习版本 v2.2.6.RELEASEdemo搭建Nacos下载地址:https://github.com/alibaba/nacos/releases/tag/2.0.0-BETA解压进入bin目录单机运行Nacos:windows系统:startup.cmd-m standalonelinux/mac:startup.sh-m standalon
Spring-Cloud-Gateway源码系列学习
版本 v2.2.6.RELEASE
demo搭建
Nacos下载地址:https://github.com/alibaba/nacos/releases/tag/2.0.0-BETA
解压进入bin目录
单机运行Nacos:
- windows系统:startup.cmd -m standalone
- linux/mac:startup.sh -m standalone
打开Nacos管理界面:
- 链接:http://localhost:8848/nacos
- 账号:nacos
- 密码:nacos
示例代码对应仓库():
示例代码由芋道源码提供,嘿嘿~
运行上面两个Spring-Boot程序
验证基于服务发现的动态路由:
http://localhost:8888/user-service/user/get?id=1
DiscoveryLocatorProperties源码分析
DiscoveryLocatorProperties是Spring-Cloud-Gateway discovery配置类,示例配置如下:
spring:
cloud:
gateway:
discovery:
locator:
enabled: true
url-expression: "'lb://' + serviceId"
下面通过 GatewayDiscoveryClientAutoConfiguration 类分析DiscoveryLocatorProperties在装配时默认添加的predicate和filter
public class GatewayDiscoveryClientAutoConfiguration{
@Bean
public DiscoveryLocatorProperties discoveryLocatorProperties() {
DiscoveryLocatorProperties properties = new DiscoveryLocatorProperties();
//设置了默认的predicate
properties.setPredicates(initPredicates());
//设置了默认的filter
properties.setFilters(initFilters());
return properties;
}
//默认predicate:-Path = "'/'+serviceId+'/**'"
public static List<PredicateDefinition> initPredicates() {
ArrayList<PredicateDefinition> definitions = new ArrayList<>();
// TODO: add a predicate that matches the url at /serviceId?
// add a predicate that matches the url at /serviceId/**
//new一个PredicateDefinition
PredicateDefinition predicate = new PredicateDefinition();
//生产AsyncPredicate时是根据name来匹配PredicateFactory的
predicate.setName(normalizeRoutePredicateName(PathRoutePredicateFactory.class));
//设置predicate配置,等于 -Path = "'/'+serviceId+'/**'"
predicate.addArg(PATTERN_KEY, "'/'+serviceId+'/**'");
definitions.add(predicate);
return definitions;
}
//默认filter:- RewritePath="'/' + serviceId + '/(?<remaining>.*)'", "'/${remaining}'"
public static List<FilterDefinition> initFilters() {
ArrayList<FilterDefinition> definitions = new ArrayList<>();
// add a filter that removes /serviceId by default
FilterDefinition filter = new FilterDefinition();
filter.setName(normalizeFilterFactoryName(RewritePathGatewayFilterFactory.class));
String regex = "'/' + serviceId + '/(?<remaining>.*)'";
String replacement = "'/${remaining}'";
filter.addArg(REGEXP_KEY, regex);
filter.addArg(REPLACEMENT_KEY, replacement);
definitions.add(filter);
return definitions;
}
}
下面接着分析DiscoveryLocatorProperties的源码
public class DiscoveryLocatorProperties {
/** Flag that enables DiscoveryClient gateway integration. */
private boolean enabled = false;
/**
* The prefix for the routeId, defaults to discoveryClient.getClass().getSimpleName()
* + "_". Service Id will be appended to create the routeId.
*/
//路由id前缀
private String routeIdPrefix;
/**
* SpEL expression that will evaluate whether to include a service in gateway
* integration or not, defaults to: true.
*/
//是否使用SpEL表达式
private String includeExpression = "true";
/**
* SpEL expression that create the uri for each route, defaults to: 'lb://'+serviceId.
*/
//创建路由使用的SpEL表达式
private String urlExpression = "'lb://'+serviceId";
/**
* Option to lower case serviceId in predicates and filters, defaults to false. Useful
* with eureka when it automatically uppercases serviceId. so MYSERIVCE, would match
* /myservice/**
*/
//是否开启大写转小写
private boolean lowerCaseServiceId = false;
//作用于每个服务发现路由的Predicate
private List<PredicateDefinition> predicates = new ArrayList<>();
//作用于每个服务发现路由的Filter
private List<FilterDefinition> filters = new ArrayList<>();
}
DiscoveryClientRouteDefinitionLocator源码分析
通过DiscoveryClient提供的Flux<List> serviceInstances 和 DiscoveryLocatorProperties properties来创建RouteDefinition
public class DiscoveryClientRouteDefinitionLocator implements RouteDefinitionLocator {
private static final Log log = LogFactory
.getLog(DiscoveryClientRouteDefinitionLocator.class);
//spring.cloud.gateway.discovery.locator配置信息
private final DiscoveryLocatorProperties properties;
//路由id前缀
private final String routeIdPrefix;
//主要用于SePL解析
private final SimpleEvaluationContext evalCtxt;
//从注册中心获取到的服务实例流
private Flux<List<ServiceInstance>> serviceInstances;
/**
* Kept for backwards compatibility. You should use the reactive discovery client.
* @param discoveryClient the blocking discovery client
* @param properties the configuration properties
* @deprecated kept for backwards compatibility
*/
@Deprecated
public DiscoveryClientRouteDefinitionLocator(DiscoveryClient discoveryClient,
DiscoveryLocatorProperties properties) {
this(discoveryClient.getClass().getSimpleName(), properties);
serviceInstances = Flux
.defer(() -> Flux.fromIterable(discoveryClient.getServices()))
.map(discoveryClient::getInstances)
.subscribeOn(Schedulers.boundedElastic());
}
/**
* 构造参数需要注入ReactiveDiscoveryClient、DiscoveryLocatorProperties,ReactiveDiscoveryClient需要注册中心框架实现,
* DiscoveryLocatorProperties就是spring.cloud.gateway.discovery.locator的配置信息
* see NacosReactiveDiscoveryClient
* @see DiscoveryLocatorProperties
*/
public DiscoveryClientRouteDefinitionLocator(ReactiveDiscoveryClient discoveryClient,
DiscoveryLocatorProperties properties) {
//调用重载的构造方法
this(discoveryClient.getClass().getSimpleName(), properties);
//从注册中心获取到的服务实例流
serviceInstances = discoveryClient.getServices()
.flatMap(service -> discoveryClient.getInstances(service).collectList());
}
//设置一些配置信息
private DiscoveryClientRouteDefinitionLocator(String discoveryClientName,
DiscoveryLocatorProperties properties) {
this.properties = properties;
if (StringUtils.hasText(properties.getRouteIdPrefix())) {
routeIdPrefix = properties.getRouteIdPrefix();
}
else {
routeIdPrefix = discoveryClientName + "_";
}
evalCtxt = SimpleEvaluationContext.forReadOnlyDataBinding().withInstanceMethods()
.build();
}
//初始化及注册中心心跳信息来临时会执行
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
//SpEL解析器
SpelExpressionParser parser = new SpelExpressionParser();
Expression includeExpr = parser
.parseExpression(properties.getIncludeExpression());
Expression urlExpr = parser.parseExpression(properties.getUrlExpression());
//判断是否含有SpEL表达式
Predicate<ServiceInstance> includePredicate;
if (properties.getIncludeExpression() == null
|| "true".equalsIgnoreCase(properties.getIncludeExpression())) {
includePredicate = instance -> true;
}
else {
includePredicate = instance -> {
Boolean include = includeExpr.getValue(evalCtxt, instance, Boolean.class);
if (include == null) {
return false;
}
return include;
};
}
return serviceInstances.filter(instances -> !instances.isEmpty())
.map(instances -> instances.get(0)).filter(includePredicate)
.map(instance -> {
//根据配置的spring.cloud.gateway.discovery.locator.url-expression和服务实例构建出RouteDefinition
RouteDefinition routeDefinition = buildRouteDefinition(urlExpr,
instance);
//委派的服务实例(instance和properties组合)
final ServiceInstance instanceForEval = new DelegatingServiceInstance(
instance, properties);
//把服务发现路由的默认Predicate添加到routeDefinition,也就是-Path = "'/'+serviceId+'/**'"
for (PredicateDefinition original : this.properties.getPredicates()) {
//new一个PredicateDefinition
PredicateDefinition predicate = new PredicateDefinition();
predicate.setName(original.getName());
for (Map.Entry<String, String> entry : original.getArgs()
.entrySet()) {
String value = getValueFromExpr(evalCtxt, parser,
instanceForEval, entry);
predicate.addArg(entry.getKey(), value);
}
routeDefinition.getPredicates().add(predicate);
}
//把服务发现路由的默认Filter添加到routeDefinition,也就是-RewritePath="'/' + serviceId + '/(?<remaining>.*)'", "'/${remaining}'"
for (FilterDefinition original : this.properties.getFilters()) {
FilterDefinition filter = new FilterDefinition();
filter.setName(original.getName());
for (Map.Entry<String, String> entry : original.getArgs()
.entrySet()) {
String value = getValueFromExpr(evalCtxt, parser,
instanceForEval, entry);
filter.addArg(entry.getKey(), value);
}
routeDefinition.getFilters().add(filter);
}
//返回
return routeDefinition;
});
}
protected RouteDefinition buildRouteDefinition(Expression urlExpr,
ServiceInstance serviceInstance) {
String serviceId = serviceInstance.getServiceId();
RouteDefinition routeDefinition = new RouteDefinition();
//设置routeId
routeDefinition.setId(this.routeIdPrefix + serviceId);
//解析出uri,如lb://user-service
String uri = urlExpr.getValue(this.evalCtxt, serviceInstance, String.class);
routeDefinition.setUri(URI.create(uri));
// add instance metadata
//添加元信息
routeDefinition.setMetadata(new LinkedHashMap<>(serviceInstance.getMetadata()));
return routeDefinition;
}
}
服务发现路由动态刷新源码分析
注册中心事件监听者(观察者)RouteRefreshListener源码分析
public class RouteRefreshListener implements ApplicationListener<ApplicationEvent> {
private final ApplicationEventPublisher publisher;
private HeartbeatMonitor monitor = new HeartbeatMonitor();
/**
* 构造器依赖注入一个ApplicationEventPublisher
* @param publisher
*/
public RouteRefreshListener(ApplicationEventPublisher publisher) {
Assert.notNull(publisher, "publisher may not be null");
this.publisher = publisher;
}
//事件处理
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ContextRefreshedEvent) {
ContextRefreshedEvent refreshedEvent = (ContextRefreshedEvent) event;
if (!WebServerApplicationContext.hasServerNamespace(
refreshedEvent.getApplicationContext(), "management")) {
//如果收到上下文刷新事件,直接刷新路由
reset();
}
}
else if (event instanceof RefreshScopeRefreshedEvent
|| event instanceof InstanceRegisteredEvent) {
//如果收到刷新生命周期事件或者实例注册事件,直接刷新路由
reset();
}
else if (event instanceof ParentHeartbeatEvent) {
//如果收到双亲ApplicationContext的心跳事件,判断是否需要刷新路由(其实就是幂等判断,消息内容变化就会刷新)
ParentHeartbeatEvent e = (ParentHeartbeatEvent) event;
resetIfNeeded(e.getValue());
}
else if (event instanceof HeartbeatEvent) {
//如果收到心跳事件,判断是否需要刷新路由(其实就是幂等判断,消息内容变化就会刷新)
HeartbeatEvent e = (HeartbeatEvent) event;
resetIfNeeded(e.getValue());
}
}
//判断是否需要刷新路由
private void resetIfNeeded(Object value) {
if (this.monitor.update(value)) {
reset();
}
}
//发送刷新路由事件
private void reset() {
this.publisher.publishEvent(new RefreshRoutesEvent(this));
}
}
刷新路由事件监听者(观察者)CachingRouteLocator源码分析
public class CachingRouteLocator implements Ordered, RouteLocator,
ApplicationListener<RefreshRoutesEvent>, ApplicationEventPublisherAware {
private final Flux<Route> routes;
private final Map<String, List> cache = new ConcurrentHashMap<>();
public CachingRouteLocator(RouteLocator delegate) {
this.delegate = delegate;
//每次调用getRoutes都会唤起CacheFlux#lookup里面的Flux#defer
routes = CacheFlux.lookup(cache, CACHE_KEY, Route.class)
.onCacheMissResume(this::fetch);
}
//调用CompositeRouteLocator#getRoutes
private Flux<Route> fetch() {
return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
}
@Override
public Flux<Route> getRoutes() {
return this.routes;
}
@Override
public void onApplicationEvent(RefreshRoutesEvent event) {
try {
//执行fetch()->Flux<Route>->collect()->Mono<List<Route>>
fetch().collect(Collectors.toList()).subscribe(list -> Flux.fromIterable(list)
//materialize()->Flux<Signal<Route>> -> collect()->Mono<List<Signal<Route>>>
.materialize().collect(Collectors.toList()).subscribe(signals -> {
//signals = Signal<Route>
//发布刷新路由结果事件
applicationEventPublisher
.publishEvent(new RefreshRoutesResultEvent(this));
//把Signal<Route>添加到缓存
cache.put(CACHE_KEY, signals);
}, throwable -> handleRefreshError(throwable)));
}
catch (Throwable e) {
handleRefreshError(e);
}
}
private void handleRefreshError(Throwable throwable) {
if (log.isErrorEnabled()) {
log.error("Refresh routes error !!!", throwable);
}
applicationEventPublisher
.publishEvent(new RefreshRoutesResultEvent(this, throwable));
}
}
更多推荐
所有评论(0)