Spring Cloud Gateway路由定义、加载、刷新
一、路由相关对象RouteDefinition路由定义Route的一种构建方式就是通过RouteDefinition,比如从properties文件中解析得到的路由规则定义public class RouteDefinition {// 唯一idprivate String id;// 断言定义private List<PredicateDefinition> predicates =
一、路由相关对象
RouteDefinition路由定义
Route的一种构建方式就是通过RouteDefinition,比如从properties文件中解析得到的路由规则定义
public class RouteDefinition {
// 唯一id
private String id;
// 断言定义
private List<PredicateDefinition> predicates = new ArrayList<>();
// 过滤器定义
private List<FilterDefinition> filters = new ArrayList<>();
// 跳转uri
private URI uri;
// 元数据
private Map<String, Object> metadata = new HashMap<>();
// Spring优先级
private int order = 0;
}
PredicateDefinition断言定义
从配置文件加载的断言定义,构造Route时,会用RoutePredicateFactory#applyAsync转换成AsyncPredicate
public class PredicateDefinition {
private String name;
private Map<String, String> args = new LinkedHashMap<>();
public PredicateDefinition() {
}
// predicates:
// - Path=/echo // 解析'Path=/echo'放入args
public PredicateDefinition(String text) {
int eqIdx = text.indexOf('=');
// 设置name
setName(text.substring(0, eqIdx));
String[] args = tokenizeToStringArray(text.substring(eqIdx + 1), ",");
// 设置args
for (int i = 0; i < args.length; i++) {
this.args.put(NameUtils.generateName(i), args[i]);
}
}
}
FilterDefinition路由过滤器定义
从配置文件加载的路由过滤器定义,构造Route时,会用GatewayFilterFactory#apply转换为GatewayFilter
public class FilterDefinition {
private String name;
private Map<String, String> args = new LinkedHashMap<>();
public FilterDefinition() {
}
// 解析配置文件 PrefixPath=/httpbin
public FilterDefinition(String text) {
int eqIdx = text.indexOf('=');
if (eqIdx <= 0) {
setName(text);
return;
}
setName(text.substring(0, eqIdx));
String[] args = tokenizeToStringArray(text.substring(eqIdx + 1), ",");
for (int i = 0; i < args.length; i++) {
this.args.put(NameUtils.generateName(i), args[i]);
}
}
}
Route路由对象
public class Route implements Ordered {
// 唯一id
private final String id;
// 跳转uri
private final URI uri;
// SpringBean优先级
private final int order;
// 断言
private final AsyncPredicate<ServerWebExchange> predicate;
// 当前路由特有的过滤器
private final List<GatewayFilter> gatewayFilters;
// 元数据
private final Map<String, Object> metadata;
}
AsyncPredicate断言对象
断言用于判断路由是否匹配某个ServerWebExchange
public interface AsyncPredicate<T> extends Function<T, Publisher<Boolean>> {
// 构造与AsyncPredicate
default AsyncPredicate<T> and(AsyncPredicate<? super T> other) {
return new AndAsyncPredicate<>(this, other);
}
// 构造非AsyncPredicate
default AsyncPredicate<T> negate() {
return new NegateAsyncPredicate<>(this);
}
// 构造或AsyncPredicate
default AsyncPredicate<T> or(AsyncPredicate<? super T> other) {
return new OrAsyncPredicate<>(this, other);
}
// 构造默认AsyncPredicate
static AsyncPredicate<ServerWebExchange> from(
Predicate<? super ServerWebExchange> predicate) {
return new DefaultAsyncPredicate<>(GatewayPredicate.wrapIfNeeded(predicate));
}
// 默认AsyncPredicate
class DefaultAsyncPredicate<T> implements AsyncPredicate<T> {
private final Predicate<T> delegate;
public DefaultAsyncPredicate(Predicate<T> delegate) {
this.delegate = delegate;
}
@Override
public Publisher<Boolean> apply(T t) {
return Mono.just(delegate.test(t));
}
}
// 非AsyncPredicate
class NegateAsyncPredicate<T> implements AsyncPredicate<T> {
private final AsyncPredicate<? super T> predicate;
public NegateAsyncPredicate(AsyncPredicate<? super T> predicate) {
this.predicate = predicate;
}
@Override
public Publisher<Boolean> apply(T t) {
return Mono.from(predicate.apply(t)).map(b -> !b);
}
}
// 与AsyncPredicate
class AndAsyncPredicate<T> implements AsyncPredicate<T> {
private final AsyncPredicate<? super T> left;
private final AsyncPredicate<? super T> right;
public AndAsyncPredicate(AsyncPredicate<? super T> left,
AsyncPredicate<? super T> right) {
this.left = left;
this.right = right;
}
@Override
public Publisher<Boolean> apply(T t) {
return Mono.from(left.apply(t)).flatMap(
result -> !result ? Mono.just(false) : Mono.from(right.apply(t)));
}
}
// 或AsyncPredicate
class OrAsyncPredicate<T> implements AsyncPredicate<T> {
private final AsyncPredicate<? super T> left;
private final AsyncPredicate<? super T> right;
public OrAsyncPredicate(AsyncPredicate<? super T> left,
AsyncPredicate<? super T> right) {
this.left = left;
this.right = right;
}
@Override
public Publisher<Boolean> apply(T t) {
return Mono.from(left.apply(t)).flatMap(
result -> result ? Mono.just(true) : Mono.from(right.apply(t)));
}
}
}
二、RouteDefinitionLocator路由定义加载器
RouteDefinitionLocator具有获取路由定义的能力
public interface RouteDefinitionLocator {
Flux<RouteDefinition> getRouteDefinitions();
}
RouteDefinitionLocator 接口有四种实现 :
- PropertiesRouteDefinitionLocator ,从配置文件 (例如,YML / Properties 等) 读取。在 《Spring-Cloud-Gateway 源码解析 —— 路由(1.2)之 PropertiesRouteDefinitionLocator 配置文件》「2. PropertiesRouteDefinitionLocator」 详细解析。
- RouteDefinitionRepository ,从存储器 (例如,内存 / Redis / MySQL 等) 读取。在 《Spring-Cloud-Gateway 源码解析 —— 路由(1.3)之 RouteDefinitionRepository 存储器》 详细解析。
- DiscoveryClientRouteDefinitionLocator ,从注册中心 (例如,Eureka / Consul / Zookeeper / Etcd 等) 读取。在 《Spring-Cloud-Gateway 源码解析 —— 路由(1.4)之 DiscoveryClientRouteDefinitionLocator 注册中心》 详细解析。
- CompositeRouteDefinitionLocator ,组合多种 RouteDefinitionLocator 的实现,为 RouteDefinitionRouteLocator 提供统一入口。在 本文 详细解析。
- 另外,CachingRouteDefinitionLocator 也是 RouteDefinitionLocator 的实现类,已经被 CachingRouteLocator 取代。
三、RouteLocator 路由加载器
RouteLocator具有获取路由Route的能力,网关处理请求时只会调用RouteLocator获取Route,通过Route的断言和过滤处理请求。
RouteLocator 可以
- 直接自定义路由 Route,
- 也可以通过 RouteDefinitionRouteLocator 获取 RouteDefinition ,然后转换成 Route
public interface RouteLocator {
Flux<Route> getRoutes();
}
Spring容器加载的时候,会把路由都放到CachingRouteLocator里,后续运行时只会和CachingRouteLocator打交道。
CachingRouteLocator:委托CompositeRouteLocator聚合其他所有RouteLocator的实现类,如RouteDefinitionRouteLocator。
RouteDefinitionRouteLocator:委托RouteDefinitionLocator的实现类去获取路由定义,然后将其转换成Route对象,RouteDefinitionLocator实现类见上面。
四、RouteRefreshListener 加载路由入口类,路由刷新监听器
加载路由到RouteLocator的入口,通过Spring事件触发
public class RouteRefreshListener implements ApplicationListener<ApplicationEvent> {
private final ApplicationEventPublisher publisher;
private HeartbeatMonitor monitor = new HeartbeatMonitor();
public RouteRefreshListener(ApplicationEventPublisher publisher) {
Assert.notNull(publisher, "publisher may not be null");
this.publisher = publisher;
}
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ContextRefreshedEvent) {
ContextRefreshedEvent refreshedEvent = (ContextRefreshedEvent)event;
if (!WebServerApplicationContext.hasServerNamespace(refreshedEvent.getApplicationContext(), "management")) {
this.reset();
}
}
else if (!(event instanceof RefreshScopeRefreshedEvent) && !(event instanceof InstanceRegisteredEvent)) {
if (event instanceof ParentHeartbeatEvent) {
ParentHeartbeatEvent e = (ParentHeartbeatEvent)event;
this.resetIfNeeded(e.getValue());
} else if (event instanceof HeartbeatEvent) {
HeartbeatEvent e = (HeartbeatEvent)event;
this.resetIfNeeded(e.getValue());
}
} else {
this.reset();
}
}
//根据value判断是是否需要reset
private void resetIfNeeded(Object value) {
if (this.monitor.update(value)) {
this.reset();
}
}
//发布RefreshRoutesEvent事件
private void reset() {
this.publisher.publishEvent(new RefreshRoutesEvent(this));
}
}
五、CachingRouteLocator刷新路由事件RefreshRoutesEvent监听器
public class CachingRouteLocator implements Ordered, RouteLocator, ApplicationListener<RefreshRoutesEvent>, ApplicationEventPublisherAware {
private static final Log log = LogFactory.getLog(CachingRouteLocator.class);
private static final String CACHE_KEY = "routes";
//委托的RouteLocator的实现类CompositeRouteLocator
private final RouteLocator delegate;
private final Flux<Route> routes;
//缓存的路由对象
private final Map<String, List> cache = new ConcurrentHashMap();
//事件发布者
private ApplicationEventPublisher applicationEventPublisher;
public CachingRouteLocator(RouteLocator delegate) {
this.delegate = delegate;
this.routes = CacheFlux.lookup(this.cache, "routes", Route.class).onCacheMissResume(this::fetch);
}
//委托CompositeRouteLocator获取Route并排序
private Flux<Route> fetch() {
return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
}
public Flux<Route> getRoutes() {
return this.routes;
}
public Flux<Route> refresh() {
this.cache.clear();
return this.routes;
}
// 接收RefreshRoutesEvent,并进行路由刷新
public void onApplicationEvent(RefreshRoutesEvent event) {
try {
this.fetch().collect(Collectors.toList()).subscribe((list) -> {
Flux.fromIterable(list).materialize().collect(Collectors.toList()).subscribe((signals) -> {
//发布刷新路由结果事件RefreshRoutesResultEvent
this.applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this));
//缓存路由
this.cache.put("routes", signals);
}, (throwable) -> {
this.handleRefreshError(throwable);
});
});
} catch (Throwable var3) {
this.handleRefreshError(var3);
}
}
......
//发布刷新路由结果事件RefreshRoutesResultEvent
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
}
CompositeRouteLocator组合路由加载器
CompositeRouteLocator聚合其他的RouteLocator实现类,委托其他实现类获取Route集合,在gateway中是委托RouteDefinitionRouteLocator来具体获取Route集合
public class CompositeRouteLocator implements RouteLocator {
// RouteDefinitionRouteLocator
private final Flux<RouteLocator> delegates;
public CompositeRouteLocator(Flux<RouteLocator> delegates) {
this.delegates = delegates;
}
public Flux<Route> getRoutes() {
return this.delegates.flatMapSequential(RouteLocator::getRoutes);
}
}
RouteDefinitionRouteLocator
RouteDefinitionRouteLocator负责通过RouteDefinition创建Route
- 委托CompositeRouteDefinitionLocator获取RouteDefinition,
- 通过ConfigurationService、RoutePredicateFactory、GatewayFilterFactory将RouteDefinition转换为Route返回
public class RouteDefinitionRouteLocator implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {
// 默认过滤器名字
public static final String DEFAULT_FILTERS = "defaultFilters";
protected final Log logger = LogFactory.getLog(this.getClass());
// 委托CompositeRouteDefinitionLocator
private final RouteDefinitionLocator routeDefinitionLocator;
// ConfigurationService操作RoutePredicateFactory和GatewayFilterFactory
// 转换断言和过滤器
private final ConfigurationService configurationService;
//RoutePredicateFactory
private final Map<String, RoutePredicateFactory> predicates = new LinkedHashMap();
//GatewayFilterFactory
private final Map<String, GatewayFilterFactory> gatewayFilterFactories = new HashMap();
// spring.cloud.gateway配置文件
private final GatewayProperties gatewayProperties;
public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator, List<RoutePredicateFactory> predicates, List<GatewayFilterFactory> gatewayFilterFactories, GatewayProperties gatewayProperties, ConfigurationService configurationService) {
this.routeDefinitionLocator = routeDefinitionLocator;
this.configurationService = configurationService;
this.initFactories(predicates);
gatewayFilterFactories.forEach((factory) -> {
GatewayFilterFactory var10000 = (GatewayFilterFactory)this.gatewayFilterFactories.put(factory.name(), factory);
});
this.gatewayProperties = gatewayProperties;
}
private void initFactories(List<RoutePredicateFactory> predicates) {
predicates.forEach((factory) -> {
String key = factory.name();
if (this.predicates.containsKey(key)) {
this.logger.warn("A RoutePredicateFactory named " + key + " already exists, class: " + this.predicates.get(key) + ". It will be overwritten.");
}
this.predicates.put(key, factory);
if (this.logger.isInfoEnabled()) {
this.logger.info("Loaded RoutePredicateFactory [" + key + "]");
}
});
}
public Flux<Route> getRoutes() {
// 委托CompositeRouteDefinitionLocator获取RouteDefinition
Flux<Route> routes = this.routeDefinitionLocator
.getRouteDefinitions()
// 转换为Route
.map(this::convertToRoute);
if (!this.gatewayProperties.isFailOnRouteDefinitionError()) {
routes = routes.onErrorContinue((error, obj) -> {
if (this.logger.isWarnEnabled()) {
this.logger.warn("RouteDefinition id " + ((RouteDefinition)obj).getId() + " will be ignored. Definition has invalid configs, " + error.getMessage());
}
});
}
return routes.map((route) -> {
if (this.logger.isDebugEnabled()) {
this.logger.debug("RouteDefinition matched: " + route.getId());
}
return route;
});
}
// 将路由定义RouteDefinition转换成Route
private Route convertToRoute(RouteDefinition routeDefinition) {
// 将routeDefinition里的PredicateDefinition
// 通过RoutePredicateFactory转换为AsyncPredicate
AsyncPredicate<ServerWebExchange> predicate = this.combinePredicates(routeDefinition);
// 将routeDefinition里的FilterDefinition
// 通过GatewayFilterFactory转换为GatewayFilter
List<GatewayFilter> gatewayFilters = this.getFilters(routeDefinition);
// 组装Route
return ((AsyncBuilder)Route.async(routeDefinition).asyncPredicate(predicate).replaceFilters(gatewayFilters)).build();
}
//根据FilterDefinition获取到对应的GatewayFilter
List<GatewayFilter> loadGatewayFilters(String id, List<FilterDefinition> filterDefinitions) {
ArrayList<GatewayFilter> ordered = new ArrayList(filterDefinitions.size());
//循环过滤器定义,得到过滤器对象List
for(int i = 0; i < filterDefinitions.size(); ++i) {
FilterDefinition definition = (FilterDefinition)filterDefinitions.get(i);
// 根据Filter定义的name获取GatewayFilterFactory
GatewayFilterFactory factory = (GatewayFilterFactory)this.gatewayFilterFactories.get(definition.getName());
if (factory == null) {
throw new IllegalArgumentException("Unable to find GatewayFilterFactory with name " + definition.getName());
}
if (this.logger.isDebugEnabled()) {
this.logger.debug("RouteDefinition " + id + " applying filter " + definition.getArgs() + " to " + definition.getName());
}
Object configuration = this.configurationService.with(factory).name(definition.getName()).properties(definition.getArgs()).eventFunction((bound, properties) -> {
return new FilterArgsEvent(this, id, (Map)properties);
}).bind();
if (configuration instanceof HasRouteId) {
HasRouteId hasRouteId = (HasRouteId)configuration;
hasRouteId.setRouteId(id);
}
// 转换为GatewayFilter
GatewayFilter gatewayFilter = factory.apply(configuration);
if (gatewayFilter instanceof Ordered) {
ordered.add(gatewayFilter);
} else {
ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
}
}
return ordered;
}
//获取该路由下的GatewayFilter
private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) {
List<GatewayFilter> filters = new ArrayList();
// 如果默认过滤器不为空,加入默认过滤器
if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {
filters.addAll(this.loadGatewayFilters("defaultFilters", new ArrayList(this.gatewayProperties.getDefaultFilters())));
}
// 如果routeDefinition里的过滤器不为空,加入过滤器
if (!routeDefinition.getFilters().isEmpty()) {
filters.addAll(this.loadGatewayFilters(routeDefinition.getId(), new ArrayList(routeDefinition.getFilters())));
}
// 排序
AnnotationAwareOrderComparator.sort(filters);
return filters;
}
private AsyncPredicate<ServerWebExchange> combinePredicates(RouteDefinition routeDefinition) {
List<PredicateDefinition> predicates = routeDefinition.getPredicates();
if (predicates != null && !predicates.isEmpty()) {
// 先获取第一个断言定义,转换为AsyncPredicate
AsyncPredicate<ServerWebExchange> predicate = this.lookup(routeDefinition, (PredicateDefinition)predicates.get(0));
AsyncPredicate found;
// 后续定义用and连接
for(Iterator var4 = predicates.subList(1, predicates.size()).iterator(); var4.hasNext(); predicate = predicate.and(found)) {
PredicateDefinition andPredicate = (PredicateDefinition)var4.next();
found = this.lookup(routeDefinition, andPredicate);
}
return predicate;
} else {
return AsyncPredicate.from((exchange) -> {
return true;
});
}
}
private AsyncPredicate<ServerWebExchange> lookup(RouteDefinition route, PredicateDefinition predicate) {
// 通过PredicateDefinition的name找到对应的RoutePredicateFactory
RoutePredicateFactory<Object> factory = (RoutePredicateFactory)this.predicates.get(predicate.getName());
if (factory == null) {
throw new IllegalArgumentException("Unable to find RoutePredicateFactory with name " + predicate.getName());
} else {
if (this.logger.isDebugEnabled()) {
this.logger.debug("RouteDefinition " + route.getId() + " applying " + predicate.getArgs() + " to " + predicate.getName());
}
// ConfigurationService操作RouteDefinition和RoutePredicateFactory
Object config = ((ConfigurableBuilder)((ConfigurableBuilder)((ConfigurableBuilder)this.configurationService.with(factory).name(predicate.getName())).properties(predicate.getArgs())).eventFunction((bound, properties) -> {
return new PredicateArgsEvent(this, route.getId(), properties);
})).bind();
return factory.applyAsync(config);
}
}
}
CompositeRouteDefinitionLocator
public class CompositeRouteDefinitionLocator implements RouteDefinitionLocator {
private static final Log log = LogFactory.getLog(CompositeRouteDefinitionLocator.class);
private final Flux<RouteDefinitionLocator> delegates;
private final IdGenerator idGenerator;
public CompositeRouteDefinitionLocator(Flux<RouteDefinitionLocator> delegates) {
this(delegates, new AlternativeJdkIdGenerator());
}
public CompositeRouteDefinitionLocator(Flux<RouteDefinitionLocator> delegates, IdGenerator idGenerator) {
this.delegates = delegates;
this.idGenerator = idGenerator;
}
public Flux<RouteDefinition> getRouteDefinitions() {
return this.delegates
// 委托所有其他RouteDefinitionLocator获取RouteDefinition
.flatMapSequential(RouteDefinitionLocator::getRouteDefinitions)
.flatMap((routeDefinition) -> {
return routeDefinition.getId() == null ? this.randomId().map((id) -> {
routeDefinition.setId(id);
if (log.isDebugEnabled()) {
log.debug("Id set on route definition: " + routeDefinition);
}
return routeDefinition;
}) : Mono.just(routeDefinition);
});
}
protected Mono<String> randomId() {
IdGenerator var10000 = this.idGenerator;
var10000.getClass();
return Mono.fromSupplier(var10000::toString).publishOn(Schedulers.boundedElastic());
}
}
PropertiesRouteDefinitionLocator
RouteDefinitionLocator有很多种,以PropertiesRouteDefinitionLocator、InMemoryRouteDefinitionRepository为例
public class PropertiesRouteDefinitionLocator implements RouteDefinitionLocator {
//网关配置文件,从该配置文件中获取路由定义
private final GatewayProperties properties;
public PropertiesRouteDefinitionLocator(GatewayProperties properties) {
this.properties = properties;
}
public Flux<RouteDefinition> getRouteDefinitions() {
return Flux.fromIterable(this.properties.getRoutes());
}
}
InMemoryRouteDefinitionRepository
基于内存为存储器 的 RouteDefinitionLocator
public class InMemoryRouteDefinitionRepository implements RouteDefinitionRepository {
private final Map<String, RouteDefinition> routes = Collections.synchronizedMap(new LinkedHashMap());
public InMemoryRouteDefinitionRepository() {
}
public Mono<Void> save(Mono<RouteDefinition> route) {
return route.flatMap((r) -> {
if (StringUtils.isEmpty(r.getId())) {
return Mono.error(new IllegalArgumentException("id may not be empty"));
} else {
this.routes.put(r.getId(), r);
return Mono.empty();
}
});
}
public Mono<Void> delete(Mono<String> routeId) {
return routeId.flatMap((id) -> {
if (this.routes.containsKey(id)) {
this.routes.remove(id);
return Mono.empty();
} else {
return Mono.defer(() -> {
return Mono.error(new NotFoundException("RouteDefinition not found: " + routeId));
});
}
});
}
public Flux<RouteDefinition> getRouteDefinitions() {
return Flux.fromIterable(this.routes.values());
}
}
public interface RouteDefinitionRepository extends RouteDefinitionLocator, RouteDefinitionWriter{ }
- 继承 RouteDefinitionLocator 接口
- 继承 RouteDefinitionWriter 接口
通过实现该接口,实现从 存储器 ( 例如,内存 / Redis / MySQL 等 )读取、保存、删除路由配置。
目前 Spring Cloud Gateway 实现了 基于内存为存储器 的 InMemoryRouteDefinitionRepository 。
- getRouteDefinitions() 方法,在 CompositeRouteDefinitionLocator调用。
- save() / delete() 方法,下面在 AbstractGatewayControllerEndpoint调用,暴漏出去HTTP接口供开发人员通过HTTP API的方式调用这两个方法,对route进行增加保存、删除。
@Bean
@ConditionalOnMissingBean({RouteDefinitionRepository.class})
public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
return new InMemoryRouteDefinitionRepository();
}
注解 @ConditionalOnMissingBean(RouteDefinitionRepository.class) ,当 不存在 RouteDefinitionRepository 的 Bean 对象时,初始化 InMemoryRouteDefinitionRepository 。也就是说,我们可以初始化自定义的 RouteDefinitionRepository 以 “注入” 。
使用 InMemoryRouteDefinitionRepository 来维护 RouteDefinition 信息,在网关实例重启或者崩溃后,RouteDefinition 就会丢失。
因此我们可以实现 RouteDefinitionRepository 接口 ,以实现例如MySQLRouteDefinitionRepository ,通过类似 MySQL 、Redis等持久化 、 可共享 的存储器,带来 Spring Cloud Gateway 实例 集群 获得一致的、相同的 RouteDefinition 信息。
参考https://www.pianshen.com/article/31232011324/
https://blog.csdn.net/he702170585/article/details/107342930/
更多推荐
所有评论(0)