一、基本概念

传统的web框架包含spring和springMVC,用以构建Servlet API并在Servlet容器中运行,其本质上是阻塞和多线程的,每个连接使用一个线程,在处理请求时,将从线程池中提取一个工作线程来处理该请求,同时该请求处理线程被阻塞,直到工作线程完成为止。因此在请求量很大的情况下,阻塞web框架不能有效地扩展。

响应式web框架spring webFlux在spring5.0版本开始推出,它是完全非阻塞、支持响应式流,并且运行在netty,undertow和Servlet3.1以上的web容器中。响应式异步web框架可以实现用较少线程达到更高的可扩展性,通过应用被称为event looping的技术,每个线程都能够处理许多请求,使得每个连接的成本低。

在这里插入图片描述
在一个event loop中,一切皆为事件,其中包括请求与回调,当需要完成一个重要的操作时,event loop并行的为那个操作注册一个回调,然后它继续去处理其他事件。当操作完成后,它会被event loop视为一个event,对于请求也是一样的操作。这样异步web框架就能够使用更少的线程处理繁重的请求,实现更好的扩展性。

1、什么是响应式编程

在传统的命令式编程中,代码都是一条一条依次执行的,产生阻塞时就会造成资源的浪费。而响应式编程是函数式和声明式的,通过该数据流的pipeline或stream描述涉及的流程。响应式流处理数据时只要数据是可用的就进行处理,而不是需要将数据作为一个整体进行提供,使我们能够并行执行任务以获得更大的可伸缩性。

Java Streams和Reactive Streams的区别
Java流通常是同步的,同时只能处理有限数据集,它们本质上是使用函数式进行集合迭代的一种方式。
响应式流支持任何大小的数据集,包括无限数据集的异步处理,使实时处理数据成为了可能。

2、响应式流中的各个角色和关系

响应式流的规范可通过四个接口定义来概括:Publisher、Subscriber、Subscription和Processor。

Publisher为每一个Subscription的Subscriber生产数据,Publisher接口中有一个subscribe()方法,Subscriber通过这个方法可以订阅Publisher。

public interface Publisher<T>{
    void subscribe(Subscriber<? super T> subscriber);
}    

Subscriber一旦进行了订阅,就可以从Publisher中接收消息,这些消息都是通过Subscriber接口中的方法进行发送。

public interface Subscriber<T>{
    void onSubscribe(Subscribtion sub);
    void onNext(T item);
    void onError(Throwable ex);
    void onComplete();
}    

Subscriber调用onSubscribe()方法会收到第一个消息,当Publisher调用onSubscribe(),会通过一个Subscription对象将消息传递给Subscriber,Subscriber可以通过Subscription获取消息,也可以通过它取消订阅。
每一个Publisher发布的项目都会通过onNext()方法将数据传输到Subscriber,如果出现错误,onError()方法将被调用,如果数据发送完成,将会调用onComplete()方法来告诉Subscriber,发送结束。

public interface Subscription{
    // n 表明接收多少个数据
    void request(long n);
    // 取消订阅
    void cancel(); 
}

至于Processor,它连接了Subscriber和Publisher

public interface Processor<T,R> extends Subscriber<T>,Publisher<R>{}

二、Reactor简介

响应式编程是通过建立一个用于数据流通的管道,数据在这个管道中进行各种各样的处理,在管道的每个阶段,是不能知道哪一步操作被哪一个线程执行了的,它们可能在同一个线程也可能不是。

1、Mono与Flux

Mono和Flux是Reactor中的两个核心类,两者都是响应式流中Publisher的实现,Flux表示任意个数据项的管道,Mono表示至多一个数据项的管道。
从对象创建Mono或Flux
使用just()方法创建一个响应式流,例如使用5个String对象来创建一个Flux:

Flux<String> fruitFlux = Flux.just("Apple","Orange","Grape","Banana","Strawberry")
.delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500)));

通常情况下,Flux会尽可能快的发送数据,delayElements()可以将数据发送速度减慢,上例中为每0.5s发送一个数据,而delaySubscription()使Flux发送第一个数据时延迟,上例中为延迟0.25s。

Publisher有了,现在需要创建一个Subscriber来订阅这个Publisher,消费其中的数据:

fruitFlux.subscribe(f -> System.out.println("Here's some fruit:" + f));

subscribe()中的lambda表达式实际上是一个简写的创建java.util.Consumer对象。

从集合创建
Flux也可以从任何的集合创建,如Iterable或Java Stream,Flux的静态方法fromArray()就是将接收的数组转换为一个Flux,类似的还有fromIterable()方法和fromStream()方法。

合并Mono或者Flux
Flux有一个mergeWith()方法,可以将两个Flux进行合并。除此之外,还有一个zip()方法,与mergeWith()不同的是,zip()是一个静态的创建操作,它将两个Flux相对应的元素进行压缩合并,压缩后发送出来的每个项目都是包含两个对象的容器。
在这里插入图片描述
在这里插入图片描述
转换响应式数据
对于Flux或Mono,最常用的操作之一就是将流中的数据转换为其他类型,Reactor为此提供了map()和flatMap()方法,map()操作会创建一个Flux,该Flux在重新发布之前,按照给定函数对其接收的每个对象执行指定的转换,而flatMap()不是简单的将一个对象映射到另一个对象,而是将对象映射到一个新的Mono或Flux。
在这里插入图片描述

三、Spring WebFlux

1、简介

Spring WebFlux是构建在一个响应式HTTP API之上,与Servlet API没有关系,可以在任何非阻塞web容器上运行,包括Netty、Undertow等。下图是SpringMVC与 Spring WebFlux的异同:
在这里插入图片描述
引入依赖:

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

2、构建响应式Controller

Spring WebFlux 与 Spring MVC 使用相同的controller层注解,在许多方面与SpringMVC没有区别。不过Spring WebFlux的Controller方法通常接受并返回响应式类型,比如Mono或Flux,而不是域类型或集合。
一个响应式controller通常是一个端到端的响应式栈的顶端,包括controller、repository、database和任何可能位于两者之间的service返回的数据类型都是响应式类型。
在这里插入图片描述

@GetMapping("/recent")
public Flux<User> getUser(){
    return userService.getAll();
}

3、使用函数式编程模型编写API

Spring5引入了一个新的函数式编程模型来定义响应式API,这个新的编程模型更像是一个库,而不是一个框架,允许将请求映射到不同注解的处理代码。使用Spring的函数式编程模型编写API涉及四种主要类型:

  • RequestPredicate:声明将会被处理的请求类型;
  • RouteFunction:声明一个匹配的请求应该如何被路由到处理代码处,也就是RequestPredicate和处理代码的映射关系;
  • ServerRequest:表示HTTP请求,包括对头和正文信息的访问;
  • ServerResponse:表示HTTP响应,包括头和正文信息。
@Configuration
public class RouteFunctionConfig {
    @Bean
    public RouterFunction<?> helloRouteFunction(){
        return RouterFunctions.route(
                RequestPredicates.GET("/hello"),
                request -> ServerResponse.ok().body(Mono.just("Hello World!"), String.class));
    }
}

在上面的代码中,RequestPredicates.GET()方法声明了一个RequestPredicate,它与 /hello 路径的HTTP GET请求相匹配,request即ServerRequest参数,ServerResponse.ok()表示创建一个带有HTTP 200 的状态码,body表示响应的内容。

helloRouterFunction()方法声明了一个仅处理单一请求(/hello)的RouterFunction,但是如果需要处理不同类型的请求,不必编写另一个@Bean方法,只需要调用andRoute()来添加另一个RequestPredicate到函数的映射即可,例如:

@Bean
    public RouterFunction<?> helloRouteFunction(){
        return RouterFunctions.route(
                RequestPredicates.GET("/hello"),
                request -> ServerResponse.ok().body(Mono.just("Hello World!"), String.class))
                .andRoute(RequestPredicates.GET("/bye"),
                request -> ServerResponse.ok().body(Mono.just("See you!"), String.class));

不过,在实际应用中,通常是以一下形式来配置的:

@Configuration
public class RouteFunctionConfig {
    @Bean
    public RouterFunction<?> routerFunction(){
        return RouterFunctions.route(
                RequestPredicates.GET("/user"),
                this::getUser).andRoute(RequestPredicates.POST("/add/user"),this::addUser);
    }

    public Mono<ServerResponse> getUser(ServerRequest request){
        return ServerResponse.ok().body(Flux.just("tom"),String.class);
    }

    public Mono<ServerResponse> addUser(ServerRequest request){
        Mono<String> userName = request.bodyToMono(String.class);
        return ServerResponse.ok().body(Flux.just(userName + "add success"),String.class);
    }
}
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐