查看SringCloud Gateway 官方文档,Gateway 工作原理如下图:
Clients make requests to Spring Cloud Gateway. If the Gateway Handler Mapping determines that a request matches a route, it is sent to the Gateway Web Handler. This handler runs the request through a filter chain that is specific to the request. The reason the filters are divided by the dotted line is that filters can run logic both before and after the proxy request is sent. All “pre” filter logic is executed. Then the proxy request is made. After the proxy request is made, the “post” filter logic is run.
客户端请求,首先会被Gateway Handler Mapping
处理,用以在 路由表 中查找一个与请求匹配的 路由 ,然后将请求交由 Web Handler
处理,Web Handler
维护了一个过滤器链,链式执行这些过滤器,这些过滤器在逻辑上存在两个执行阶段 pre
与 post
。
本文重点探究 路由查找 的过程,并在此基础上,探究 持久化动态路由表 的实现方式。
源码阅读
通过阅读官方源码,梳理gateway工作机制,并寻找扩展点,以便实现 持久化动态路由表 。
RoutePredicateHandlerMapping
Gateway中实现路由查找逻辑的 Gateway Handler Mapping
是 RoutePredicateHandlerMapping 类,该类在GatewayAutoConfiguration 中实现自动装配(Gateway的Bean自动装备都是由此类实现) ,源码260-266行如下
@Bean
public RoutePredicateHandlerMapping routePredicateHandlerMapping(
FilteringWebHandler webHandler, RouteLocator routeLocator,
GlobalCorsProperties globalCorsProperties, Environment environment) {
return new RoutePredicateHandlerMapping(webHandler, routeLocator,
globalCorsProperties, environment);
}
首先可以看到,这里装配是无条件的,没有留出拓展点(我之前文章对此用了特殊的方法进行了拓展),重点是两个Bean的注入:
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
return lookupRoute(exchange)
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug(
"Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes()
.concatMap(route -> Mono.just(route).filterWhen(r -> {
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
RouteLocator 实现类
@Override
public Flux<Route> getRoutes() {
Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions()
.map(this::convertToRoute);
我们在次看下GatewayAutoConfiguration 中自动装配的情况,源码223-240行
@Bean
public RouteLocator routeDefinitionRouteLocator(GatewayProperties properties,
List<GatewayFilterFactory> gatewayFilters,
List<RoutePredicateFactory> predicates,
RouteDefinitionLocator routeDefinitionLocator,
ConfigurationService configurationService) {
return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates,
gatewayFilters, properties, configurationService);
}
@Bean
@Primary
@ConditionalOnMissingBean(name = "cachedCompositeRouteLocator")
public RouteLocator cachedCompositeRouteLocator(List<RouteLocator> routeLocators) {
return new CachingRouteLocator(
new CompositeRouteLocator(Flux.fromIterable(routeLocators)));
}
在GatewayAutoConfiguration 中并没有直接装配 CompositeRouteLocator,而是嵌套在了CachingRouteLocator 中,上述代码块17行表示所有RouteLocator的实现类都会被装配到cachedCompositeRouteLocator
中(也包含cachedCompositeRouteLocator
,16行巧妙的通过一个Conditional避免了自身对自身的循环依赖),这样注入到cachedCompositeRouteLocator
其实只有 routeDefinitionRouteLocator
,这里可以由玩家进行拓展。
CachingRouteLocator 本文不进行展开说明,在你创建路由后,需发布一个RefreshRoutesEvent 事件,然后这个Locator就可以监听到该事件,并刷新路由。
我们看下一个RouteDefinitionRouteLocator的构造方法
public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,
List<RoutePredicateFactory> predicates,
List<GatewayFilterFactory> gatewayFilterFactories,
GatewayProperties gatewayProperties,
ConfigurationService configurationService) {
this.routeDefinitionLocator = routeDefinitionLocator;
this.configurationService = configurationService;
initFactories(predicates);
gatewayFilterFactories.forEach(
factory -> this.gatewayFilterFactories.put(factory.name(), factory));
this.gatewayProperties = gatewayProperties;
}
结合上面代码块的内容,例如我们在使用SpringCloud Gateway 时定义一个路由:
spring:
cloud:
gateway:
routes:
- id: path_route
uri: https://example.org
predicates:
- Path=/red/{segment},/blue/{segment}
filters:
- AddRequestHeader=X-Request-red, blue
Path 对应 PathRoutePredicateFactory,AddRequestHeader 对应 AddRequestHeaderGatewayFilterFactory。
RouteDefinitionLocator 实现类
装配 RouteDefinitionRouteLocator 时注入了一个 name 为 routeDefinitionLocator 的 Bean,回到GatewayAutoConfiguration 看 RouteDefinitionLocator 是如何装配的,源码208-214行
@Bean
@Primary
public RouteDefinitionLocator routeDefinitionLocator(
List<RouteDefinitionLocator> routeDefinitionLocators) {
return new CompositeRouteDefinitionLocator(Flux.fromIterable(routeDefinitionLocators));
}
这里采用了一个 CompositeRouteDefinitionLocator 对所有 RouteDefinitionLocator 的实现类进行了组合封装,这些实现了,都实现了具体的 getRouteDefinitions()
方法。
GatewayDiscoveryClientAutoConfiguration 是对上面RouteDefinitionLocator的配套过滤器、断言的自动装配,默认关闭此加载器,需要通过配置文件开启。
# 开启此locator
spring.cloud.gateway.discovery.locator.enabled=true
# 开启默认为 reactive 模式,需显示关闭可调整为阻塞模式
spring.cloud.discovery.reactive.enabled=false
通过GatewayAutoConfiguration 查看其装配代码,源码202-206行:
@Bean
@ConditionalOnMissingBean(RouteDefinitionRepository.class)
public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
return new InMemoryRouteDefinitionRepository();
}
Management Endpoints:
@Bean
@Primary
public RouteDefinitionLocator routeDefinitionLocator(
List<RouteDefinitionLocator> routeDefinitionLocators) {
return new CompositeRouteDefinitionLocator(
Flux.fromIterable(routeDefinitionLocators));
}
可以看到前面 RouteDefinitionRouteLocator 中注入的 routeDefinitionLocator
就是CompositeRouteDefinitionLocator,而装配它是注入了所有RouteDefinitionLocator的实现,其中包括RouteDefinitionRepository的实现(默认情况下为InMemoryRouteDefinitionRepository)。
到这里就比较明确了,如果需要实现可持久化存储的动态路由,我们只需基于数据库(或其他持久化存储),参考InMemoryRouteDefinitionRepository实现一个RouteDefinitionRepository即可。
基于MongoDB的动态路由
上文归纳总结如下:
-
从RoutePredicateHandlerMapping入手,注入其中的RouteLocator为CachingRouteLocator ;
-
CachingRouteLocator 封装了CompositeRouteLocator ;
-
CompositeRouteLocator 组合的唯一RouteLocator就是 RouteDefinitionRouteLocator ;
-
RouteDefinitionRouteLocator 实现了DefinitionRoute到Route的转换,并注入了一个name为routeDefinitionLocator的Bean,即CompositeRouteDefinitionLocator ;
-
CompositeRouteDefinitionLocator 组合了所有的RouteDefinitionLocator,其中包括RouteDefinitionRepository的一个实现,即InMemoryRouteDefinitionRepository ;
-
InMemoryRouteDefinitionRepository 装配是有条件的,仅在不存在其他RouteDefinitionRepository的Bean才生效;
-
参考 InMemoryRouteDefinitionRepository,实现一个基于数据库的路由存储。
全部代码见:scg-dynamic-route
主要代码片段
@Slf4j
@Component
public class MongoRouteDefinitionRepository
implements RouteDefinitionRepository, ApplicationEventPublisherAware {
private static final String CACHE_KEY = "routes";
private ApplicationEventPublisher eventPublisher;
private Map<String, RouteDefinition> cache = new ConcurrentHashMap<>();
private final RouteRepositoryOperations repositoryOperation;
public MongoRouteDefinitionRepository(RouteRepositoryOperations repositoryOperation) {
this.repositoryOperation = repositoryOperation;
}
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return Flux.fromIterable(cache.values());
}
@Override
public Mono<Void> save(Mono<RouteDefinition> route) {
return route.flatMap(
r -> repositoryOperation.save(MongoRouteDefinition.from(r))
.log()
.doOnNext(this::addCache)
.then(Mono.empty())
);
}
@Override
public Mono<Void> delete(Mono<String> routeId) {
return repositoryOperation.findById(routeId)
.log()
.map(RouteDefinition::getId)
.doOnNext(this::removeCache)
.flatMap(repositoryOperation::deleteById);
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
public void addCache(RouteDefinition route) {
this.cache.putIfAbsent(route.getId(), route);
this.publishEvent();
}
public void removeCache(String routeId) {
if (this.cache.remove(routeId) != null) {
this.publishEvent();
}
}
void publishEvent() {
eventPublisher.publishEvent(new RefreshRoutesEvent(this));
}
RouteRepositoryOperations getRepositoryOperation() {
return repositoryOperation;
}
Map<String, RouteDefinition> getCache() {
return cache;
}
void setCache(
Map<String, RouteDefinition> cache) {
this.cache = cache;
}
}
@Slf4j
@Component
@ConditionalOnProperty(value = "route.schedule.enabled", havingValue = "true", matchIfMissing = true)
public class RouteRefresher {
private final MongoRouteDefinitionRepository repository;
public RouteRefresher(
MongoRouteDefinitionRepository repository) {
this.repository = repository;
}
@Scheduled(initialDelay = 10000, fixedDelay = 60 * 60 * 1001)
private void refresh() {
RouteRepositoryOperations operation = repository.getRepositoryOperation();
int page = 0;
int pageSize = 1000;
int total = Math.toIntExact(operation.count().blockOptional().orElse(0L));
Map<String, RouteDefinition> oldCache = repository.getCache();
Map<String, RouteDefinition> newCache = new ConcurrentHashMap<>(total);
int oldTotal = oldCache.size();
if (oldTotal < 1) {
repository.setCache(newCache);
}
while (page * pageSize < total) {
operation.findAll(PageRequest.of(page++, pageSize))
.doOnNext(route -> newCache.putIfAbsent(route.getId(), route))
.blockLast();
log.info("动态路由表当前总大小为:{}, 新路由表当前大小为:{}", oldTotal, newCache.size());
}
repository.setCache(newCache);
log.info("新路由表加载完成,当前大小为:{}", newCache.size());
repository.publishEvent();
}
}
@Component
@ConditionalOnProperty(value = "changeStream.enabled", havingValue = "true", matchIfMissing = true)
public class RouteChangeStreamHandler implements CommandLineRunner {
private final ReactiveMongoTemplate mongoTemplate;
private final MongoRouteDefinitionRepository routeRepository;
public RouteChangeStreamHandler(
MongoRouteDefinitionRepository routeRepository, ReactiveMongoTemplate mongoTemplate) {
this.routeRepository = routeRepository;
this.mongoTemplate = mongoTemplate;
}
@Override
public void run(String... args) {
new Thread(this::startMonitor, "ChangeStream-Monitor-routes").start();
}
public void startMonitor() {
Aggregation aggregation = Aggregation.newAggregation(Aggregation
.match(Criteria.where("operationType").in("insert", "delete", "update", "replace")));
ChangeStreamOptions options = ChangeStreamOptions.builder()
.filter(aggregation)
.returnFullDocumentOnUpdate()
.build();
String collectionName = MongoRouteDefinition.class.getAnnotation(Document.class).value();
Flux<ChangeStreamEvent<MongoRouteDefinition>> changeStream = mongoTemplate
.changeStream(collectionName, options, MongoRouteDefinition.class);
changeStream
.log()
.doOnNext(e -> {
if (OperationType.INSERT == e.getOperationType()
|| OperationType.UPDATE == e.getOperationType()
|| OperationType.REPLACE == e.getOperationType()) {
Optional.ofNullable(e.getBody()).ifPresent(routeRepository::addCache);
} else if (OperationType.DELETE == e.getOperationType()) {
getId(e).ifPresent(routeRepository::removeCache);
}
}).blockLast();
}
private Optional<String> getId(ChangeStreamEvent<MongoRouteDefinition> e) {
return Optional.ofNullable(e.getRaw())
.flatMap(raw -> Optional.ofNullable(raw.getDocumentKey()))
.flatMap(docKey -> Optional.ofNullable(docKey.getObjectId("_id")))
.flatMap(bson -> Optional.of(bson.getValue().toHexString()));
}
}
@Document("gwRoutes")
public class MongoRouteDefinition extends RouteDefinition {
public static MongoRouteDefinition from(RouteDefinition route) {
MongoRouteDefinition newRoute = new MongoRouteDefinition();
BeanUtils.copyProperties(route, newRoute);
return newRoute;
}
}
public interface RouteRepositoryOperations extends
ReactiveMongoRepository<MongoRouteDefinition, String> {
@Query(value = "{}", sort = "{_id:1}")
Flux<MongoRouteDefinition> findAll(Pageable pageable);
}
原文链接:【https://xie.infoq.cn/article/0ae4f61ce6c67a651d94678a8】。
</article>
所有评论(0)