https://zhuanlan.zhihu.com/p/30813274



聊聊 Spring Boot 2.0 的 WebFlux

泥瓦匠BYSocket 泥瓦匠BYSocket
4 个月前

聊聊 Spring Boot 2.0 的 WebFlux## 前言
对照下 Spring Web MVC ,Spring Web MVC 是基于 Servlet API 和 Servlet 容器设计的。那么 Spring WebFlux 肯定不是基于前面两者,它基于 Reactive Streams API 和 Servlet 3.1+ 容器设计。


那 Reactive Streams API 是什么?

先理解 Stream 流是什么?流是序列,是生产者生产,一个或多个消费者消费的元素序列。这种具体的设计模式成为发布订阅模式。常见的流处理机制是 pull / push 模式。背压是一种常用策略,使得发布者拥有无限制的缓冲区存储 item,用于确保发布者发布 item 太快时,不会去压制订阅者。
Reactive Streams (响应式流)是提供处理非阻塞背压异步流的一种标准。主要针对的场景是运行时环境(包括 JVM 和 JS)和网络。同样,JDK 9 java.util.concurrent 包提供了两个主要的 API 来处理响应流:
- Flow
- SubmissionPublisher


为啥只能运行在 Servlet 3.1+ 容器?

大家知道,3.1 规范其中一个新特性是异步处理支持。
异步处理支持:Servlet 线程不需一直阻塞,即不需要到业务处理完毕再输出响应,然后结束 Servlet线程。异步处理的作用是在接收到请求之后,Servlet 线程可以将耗时的操作委派给另一个线程来完成,在不生成响应的情况下返回至容器。主要应用场景是针对业务处理较耗时的情况,可以减少服务器资源的占用,并且提高并发处理速度。
所以 WebFlux 支持的容器有 Tomcat、Jetty(Non-Blocking IO API) ,也可以像 Netty 和 Undertow 的本身就支持异步容器。在容器中 Spring WebFlux 会将输入流适配成 Mono 或者 Flux 格式进行统一处理。


Spring WebFlux 是什么

先看这张图,上面我们了解了容器、响应流。这里介绍下 Spring WebFlux 是什么? Spring WebFlux 是 Spring 5 的一个新模块,包含了响应式 HTTP 和 WebSocket 的支持,另外在上层服务端支持两种不同的编程模型:
- 基于 Spring MVC 注解 @Controller 等
- 基于 Functional 函数式路由

下面是两个实现小案例,首先在 pom.xml 加入对应的依赖:

 <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>


基于 Spring MVC 注解 RESTful API

官方案例很简单,如下:

@RestController
public class PersonController {

        private final PersonRepository repository;

        public PersonController(PersonRepository repository) {
                this.repository = repository;
        }

        @PostMapping("/person")
        Mono<Void> create(@RequestBody Publisher<Person> personStream) {
                return this.repository.save(personStream).then();
        }

        @GetMapping("/person")
        Flux<Person> list() {
                return this.repository.findAll();
        }

        @GetMapping("/person/{id}")
        Mono<Person> findById(@PathVariable String id) {
                return this.repository.findOne(id);
        }
}

但是 PersonRepository 这种 Spring Data Reactive Repositories 不支持 MySQL,进一步也不支持 MySQL 事务。所以用了 Reactivey 原来的 spring 事务管理就不好用了。jdbc jpa 的事务是基于阻塞 IO 模型的,如果 Spring Data Reactive 没有升级 IO 模型去支持 JDBC,生产上的应用只能使用不强依赖事务的。也可以使用透明的事务管理,即每次操作的时候以回调形式去传递数据库连接 connection。

Spring Data Reactive Repositories 目前支持 Mongo、Cassandra、Redis、Couchbase 。

那提到的“用了 Reactivey 原来的 spring 事务管理就不好用了”,您能否再详细介绍一下?另外,应用有强依赖事务,有没有对应的解决方案?

我们先看看这张图。Spring Boot 2.0 这里有两条不同的线分别是:
Spring Web MVC -> Spring Data
Spring WebFlux -> Spring Data Reactive

所以这里问题的答案是,如果使用 Spring Data Reactive ,原来的 Spring 针对 Spring Data (JDBC等)的事务管理肯定不起作用了。因为原来的 Spring 事务管理(Spring Data JPA)都是基于 ThreadLocal 传递事务的,其本质是基于 阻塞 IO 模型,不是异步的。但 Reactive 是要求异步的,不同线程里面 ThreadLocal 肯定取不到值了。自然,我们得想想如何在使用 Reactive 编程是做到事务,有一种方式是 回调 方式,一直传递 conn :
newTransaction(conn ->{})

因为每次操作数据库也是异步的,所以 connection 在 Reactive 编程中无法靠 ThreadLocal 传递了,只能放在参数上面传递。虽然会有一定的代码侵入行。进一步,也可以 kotlin 协程,去做到透明的事务管理,即把 conn 放到 协程的局部变量中去。
那 Spring Data Reactive Repositories 不支持 MySQL,进一步也不支持 MySQL 事务,怎么办?

答案是,这个问题其实和第一个问题也相关。 为啥不支持 MySQL,即 JDBC 不支持。大家可以看到 JDBC 是所属 Spring Data 的。所以可以等待 Spring Data Reactive Repositories 升级 IO 模型,去支持 MySQL。也可以和上面也讲到了,如何使用 Reactive 编程支持事务。

如果应用只能使用不强依赖数据事务,依旧使用 MySQL ,可以使用下面的实现,代码如下:


@RestController
@RequestMapping(value = "/city")
public class CityRestController {

    @Autowired
    private CityService cityService;

    @RequestMapping(value = "/{id}", method = RequestMethod.GET)
    public Mono<City> findOneCity(@PathVariable("id") Long id) {
        return Mono.create(cityMonoSink -> cityMonoSink.success(cityService.findCityById(id)));
    }

    @RequestMapping(method = RequestMethod.GET)
    public Flux<City> findAllCity() {
        return Flux.create(cityFluxSink -> {
            cityService.findAllCity().forEach(city -> {
                cityFluxSink.next(city);
            });
            cityFluxSink.complete();
        });
    }

    @RequestMapping(method = RequestMethod.POST)
    public Mono<Long> createCity(@RequestBody City city) {
        return Mono.create(cityMonoSink -> cityMonoSink.success(cityService.saveCity(city)));
    }

    @RequestMapping(method = RequestMethod.PUT)
    public Mono<Long> modifyCity(@RequestBody City city) {
        return Mono.create(cityMonoSink -> cityMonoSink.success(cityService.updateCity(city)));
    }

    @RequestMapping(value = "/{id}", method = RequestMethod.DELETE)
    public Mono<Long> modifyCity(@PathVariable("id") Long id) {
        return Mono.create(cityMonoSink -> cityMonoSink.success(cityService.deleteCity(id)));
    }
}

findAllCity 方法中,利用 Flux.create 方法对响应进行创建封装成 Flux 数据。并且使用 lambda 写数据流的处理函数会十分的方便。
Service 层依旧是以前那套逻辑,业务服务层接口如下:

public interface CityService {

    /**
     * 获取城市信息列表
     *
     * @return
     */
    List<City> findAllCity();

    /**
     * 根据城市 ID,查询城市信息
     *
     * @param id
     * @return
     */
    City findCityById(Long id);

    /**
     * 新增城市信息
     *
     * @param city
     * @return
     */
    Long saveCity(City city);

    /**
     * 更新城市信息
     *
     * @param city
     * @return
     */
    Long updateCity(City city);

    /**
     * 根据城市 ID,删除城市信息
     *
     * @param id
     * @return
     */
    Long deleteCity(Long id);
}

具体案例在我的 Github:github.com/JeffLi1993/s

基于 Functional 函数式路由实现 RESTful API

创建一个 Route 类来定义 RESTful HTTP 路由

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class Routes {
    private CityService cityService;

    public Routes(CityService cityService) {
        this.cityService = cityService;
    }

    @Bean
    public RouterFunction<?> routerFunction() {
        return route(
                       GET("/api/city").and(accept(MediaType.APPLICATION_JSON)), cityService:: findAllCity).and(route(
                       GET("/api/user/{id}").and(accept(MediaType.APPLICATION_JSON)), cityService:: findCityById)
                );
    }
}

RoouterFunction 类似 Spring Web MVC 的 @RequestMapping ,用来定义路由信息,每个路由会映射到一个处理方法,当接受 HTTP 请求时候会调用该处理方法。

创建 HttpServerConfig 自定义 Http Server,这里创建一个 Netty HTTP 服务器:

import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import reactor.ipc.netty.http.server.HttpServer;

@Configuration
public class HttpServerConfig {
    @Autowired
    private Environment environment;

    @Bean
    public HttpServer httpServer(RouterFunction<?> routerFunction) {
        HttpHandler httpHandler = RouterFunctions.toHttpHandler(routerFunction);
        ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
        HttpServer server = HttpServer.create("localhost", Integer.valueOf(environment.getProperty("server.port")));
        server.newHandler(adapter);
        return server;
    }
}

自然推荐 Netty 来运行 Reactive 应用,因为 Netty 是基于异步和事件驱动的。



原文:聊聊 Spring Boot 2.0 的 WebFlux
文章被以下专栏收录
10 条评论
写下你的评论...

哲人王
能提供一下压测数据的话,会更有说服力,reactive写法是优雅,但性能没有提升的话,后端是没有动力使用的
4 个月前
aall ql
这跟vertx的套路很像
4 个月前
MelloChan
vert.x
4 个月前
tangshiba
这个是属于spring 5的内容吧。
4 个月前
东东
希望 spring5 对推广 kotlin 起到作用。
4 个月前
慕艾
强哥666!
4 个月前
van小晓东

如何体现出Controller调用Service是异步的?

4 个月前
王鸿飞
 查看对话

理论上在高并发下应该碾压非异步模型,如果达不到就是实现有问题 Results from Spring 5 Webflux Performance Tests - Ippon Technologies

2 个月前
长乐
 查看对话

service是阻塞的,controller层直接调用了阻塞代码,虽然是在Mono中,但是依旧会阻塞controller的线程。应该是使用publishOn()或者subscribeOn()让阻塞代码在elastic线程池中执行。(貌似是这样。。)

2 个月前
长乐
 查看对话

补充一下,如果直接在requestmapping的方法中获得Mono类型参数,subscribeOn()是无效的,必须用publishOn()切换,应该是穿参之前就调用过publishOn了

2 个月前
推荐阅读

聊聊 Spring Boot 2.0 的 WebFlux

泥瓦匠BYSocket 泥瓦匠BYSocket
4 个月前

聊聊 Spring Boot 2.0 的 WebFlux## 前言
对照下 Spring Web MVC ,Spring Web MVC 是基于 Servlet API 和 Servlet 容器设计的。那么 Spring WebFlux 肯定不是基于前面两者,它基于 Reactive Streams API 和 Servlet 3.1+ 容器设计。


那 Reactive Streams API 是什么?

先理解 Stream 流是什么?流是序列,是生产者生产,一个或多个消费者消费的元素序列。这种具体的设计模式成为发布订阅模式。常见的流处理机制是 pull / push 模式。背压是一种常用策略,使得发布者拥有无限制的缓冲区存储 item,用于确保发布者发布 item 太快时,不会去压制订阅者。
Reactive Streams (响应式流)是提供处理非阻塞背压异步流的一种标准。主要针对的场景是运行时环境(包括 JVM 和 JS)和网络。同样,JDK 9 java.util.concurrent 包提供了两个主要的 API 来处理响应流:
- Flow
- SubmissionPublisher


为啥只能运行在 Servlet 3.1+ 容器?

大家知道,3.1 规范其中一个新特性是异步处理支持。
异步处理支持:Servlet 线程不需一直阻塞,即不需要到业务处理完毕再输出响应,然后结束 Servlet线程。异步处理的作用是在接收到请求之后,Servlet 线程可以将耗时的操作委派给另一个线程来完成,在不生成响应的情况下返回至容器。主要应用场景是针对业务处理较耗时的情况,可以减少服务器资源的占用,并且提高并发处理速度。
所以 WebFlux 支持的容器有 Tomcat、Jetty(Non-Blocking IO API) ,也可以像 Netty 和 Undertow 的本身就支持异步容器。在容器中 Spring WebFlux 会将输入流适配成 Mono 或者 Flux 格式进行统一处理。


Spring WebFlux 是什么

先看这张图,上面我们了解了容器、响应流。这里介绍下 Spring WebFlux 是什么? Spring WebFlux 是 Spring 5 的一个新模块,包含了响应式 HTTP 和 WebSocket 的支持,另外在上层服务端支持两种不同的编程模型:
- 基于 Spring MVC 注解 @Controller 等
- 基于 Functional 函数式路由

下面是两个实现小案例,首先在 pom.xml 加入对应的依赖:

 <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>


基于 Spring MVC 注解 RESTful API

官方案例很简单,如下:

@RestController
public class PersonController {

        private final PersonRepository repository;

        public PersonController(PersonRepository repository) {
                this.repository = repository;
        }

        @PostMapping("/person")
        Mono<Void> create(@RequestBody Publisher<Person> personStream) {
                return this.repository.save(personStream).then();
        }

        @GetMapping("/person")
        Flux<Person> list() {
                return this.repository.findAll();
        }

        @GetMapping("/person/{id}")
        Mono<Person> findById(@PathVariable String id) {
                return this.repository.findOne(id);
        }
}

但是 PersonRepository 这种 Spring Data Reactive Repositories 不支持 MySQL,进一步也不支持 MySQL 事务。所以用了 Reactivey 原来的 spring 事务管理就不好用了。jdbc jpa 的事务是基于阻塞 IO 模型的,如果 Spring Data Reactive 没有升级 IO 模型去支持 JDBC,生产上的应用只能使用不强依赖事务的。也可以使用透明的事务管理,即每次操作的时候以回调形式去传递数据库连接 connection。

Spring Data Reactive Repositories 目前支持 Mongo、Cassandra、Redis、Couchbase 。

那提到的“用了 Reactivey 原来的 spring 事务管理就不好用了”,您能否再详细介绍一下?另外,应用有强依赖事务,有没有对应的解决方案?

我们先看看这张图。Spring Boot 2.0 这里有两条不同的线分别是:
Spring Web MVC -> Spring Data
Spring WebFlux -> Spring Data Reactive

所以这里问题的答案是,如果使用 Spring Data Reactive ,原来的 Spring 针对 Spring Data (JDBC等)的事务管理肯定不起作用了。因为原来的 Spring 事务管理(Spring Data JPA)都是基于 ThreadLocal 传递事务的,其本质是基于 阻塞 IO 模型,不是异步的。但 Reactive 是要求异步的,不同线程里面 ThreadLocal 肯定取不到值了。自然,我们得想想如何在使用 Reactive 编程是做到事务,有一种方式是 回调 方式,一直传递 conn :
newTransaction(conn ->{})

因为每次操作数据库也是异步的,所以 connection 在 Reactive 编程中无法靠 ThreadLocal 传递了,只能放在参数上面传递。虽然会有一定的代码侵入行。进一步,也可以 kotlin 协程,去做到透明的事务管理,即把 conn 放到 协程的局部变量中去。
那 Spring Data Reactive Repositories 不支持 MySQL,进一步也不支持 MySQL 事务,怎么办?

答案是,这个问题其实和第一个问题也相关。 为啥不支持 MySQL,即 JDBC 不支持。大家可以看到 JDBC 是所属 Spring Data 的。所以可以等待 Spring Data Reactive Repositories 升级 IO 模型,去支持 MySQL。也可以和上面也讲到了,如何使用 Reactive 编程支持事务。

如果应用只能使用不强依赖数据事务,依旧使用 MySQL ,可以使用下面的实现,代码如下:


@RestController
@RequestMapping(value = "/city")
public class CityRestController {

    @Autowired
    private CityService cityService;

    @RequestMapping(value = "/{id}", method = RequestMethod.GET)
    public Mono<City> findOneCity(@PathVariable("id") Long id) {
        return Mono.create(cityMonoSink -> cityMonoSink.success(cityService.findCityById(id)));
    }

    @RequestMapping(method = RequestMethod.GET)
    public Flux<City> findAllCity() {
        return Flux.create(cityFluxSink -> {
            cityService.findAllCity().forEach(city -> {
                cityFluxSink.next(city);
            });
            cityFluxSink.complete();
        });
    }

    @RequestMapping(method = RequestMethod.POST)
    public Mono<Long> createCity(@RequestBody City city) {
        return Mono.create(cityMonoSink -> cityMonoSink.success(cityService.saveCity(city)));
    }

    @RequestMapping(method = RequestMethod.PUT)
    public Mono<Long> modifyCity(@RequestBody City city) {
        return Mono.create(cityMonoSink -> cityMonoSink.success(cityService.updateCity(city)));
    }

    @RequestMapping(value = "/{id}", method = RequestMethod.DELETE)
    public Mono<Long> modifyCity(@PathVariable("id") Long id) {
        return Mono.create(cityMonoSink -> cityMonoSink.success(cityService.deleteCity(id)));
    }
}

findAllCity 方法中,利用 Flux.create 方法对响应进行创建封装成 Flux 数据。并且使用 lambda 写数据流的处理函数会十分的方便。
Service 层依旧是以前那套逻辑,业务服务层接口如下:

public interface CityService {

    /**
     * 获取城市信息列表
     *
     * @return
     */
    List<City> findAllCity();

    /**
     * 根据城市 ID,查询城市信息
     *
     * @param id
     * @return
     */
    City findCityById(Long id);

    /**
     * 新增城市信息
     *
     * @param city
     * @return
     */
    Long saveCity(City city);

    /**
     * 更新城市信息
     *
     * @param city
     * @return
     */
    Long updateCity(City city);

    /**
     * 根据城市 ID,删除城市信息
     *
     * @param id
     * @return
     */
    Long deleteCity(Long id);
}

具体案例在我的 Github:github.com/JeffLi1993/s

基于 Functional 函数式路由实现 RESTful API

创建一个 Route 类来定义 RESTful HTTP 路由

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class Routes {
    private CityService cityService;

    public Routes(CityService cityService) {
        this.cityService = cityService;
    }

    @Bean
    public RouterFunction<?> routerFunction() {
        return route(
                       GET("/api/city").and(accept(MediaType.APPLICATION_JSON)), cityService:: findAllCity).and(route(
                       GET("/api/user/{id}").and(accept(MediaType.APPLICATION_JSON)), cityService:: findCityById)
                );
    }
}

RoouterFunction 类似 Spring Web MVC 的 @RequestMapping ,用来定义路由信息,每个路由会映射到一个处理方法,当接受 HTTP 请求时候会调用该处理方法。

创建 HttpServerConfig 自定义 Http Server,这里创建一个 Netty HTTP 服务器:

import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import reactor.ipc.netty.http.server.HttpServer;

@Configuration
public class HttpServerConfig {
    @Autowired
    private Environment environment;

    @Bean
    public HttpServer httpServer(RouterFunction<?> routerFunction) {
        HttpHandler httpHandler = RouterFunctions.toHttpHandler(routerFunction);
        ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
        HttpServer server = HttpServer.create("localhost", Integer.valueOf(environment.getProperty("server.port")));
        server.newHandler(adapter);
        return server;
    }
}

自然推荐 Netty 来运行 Reactive 应用,因为 Netty 是基于异步和事件驱动的。



原文:聊聊 Spring Boot 2.0 的 WebFlux
文章被以下专栏收录
10 条评论
写下你的评论...

哲人王
能提供一下压测数据的话,会更有说服力,reactive写法是优雅,但性能没有提升的话,后端是没有动力使用的
4 个月前
aall ql
这跟vertx的套路很像
4 个月前
MelloChan
vert.x
4 个月前
tangshiba
这个是属于spring 5的内容吧。
4 个月前
东东
希望 spring5 对推广 kotlin 起到作用。
4 个月前
慕艾
强哥666!
4 个月前
van小晓东

如何体现出Controller调用Service是异步的?

4 个月前
王鸿飞
 查看对话

理论上在高并发下应该碾压非异步模型,如果达不到就是实现有问题 Results from Spring 5 Webflux Performance Tests - Ippon Technologies

2 个月前
长乐
 查看对话

service是阻塞的,controller层直接调用了阻塞代码,虽然是在Mono中,但是依旧会阻塞controller的线程。应该是使用publishOn()或者subscribeOn()让阻塞代码在elastic线程池中执行。(貌似是这样。。)

2 个月前
长乐
 查看对话

补充一下,如果直接在requestmapping的方法中获得Mono类型参数,subscribeOn()是无效的,必须用publishOn()切换,应该是穿参之前就调用过publishOn了

2 个月前
推荐阅读
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐