响应式web(四):使用Netty作为web容器,基于注解的WebFlux阻塞式与响应式实现
目录使用 WebFlux,针对IO密集度比较高的系统,性能会有提升。使用 Netty 作为 web 容器:注释掉spring-boot-starter-web,启动就默认用的 netty 而不是 tomcat 了基于注解的 WebFlux 阻塞式与响应式实现WebFlux + SSE 服务器推:下面的案例3WebFlux 中的 ServerHttpRequest 与 SpringMVC 的区别
·
目录
使用 WebFlux,针对IO密集度比较高的系统,性能会有提升。
- 使用 Netty 作为 web 容器:注释掉
spring-boot-starter-web
,启动就默认用的 netty 而不是 tomcat 了 - 基于注解的 WebFlux 阻塞式与响应式实现
- WebFlux + SSE 服务器推:下面的案例3
- WebFlux 中的 ServerHttpRequest 与 SpringMVC 的区别
源码地址
GitHub 地址:https://github.com/HanquanHq/TestReactive
运行效果
本地运行项目后,浏览器输入:
案例1:http://127.0.0.1:8080/person
控制台输出:
=========here get=====
before return, mono: MonoPeekTerminal
return方法线程名称:reactor-http-nio-4
1. doOnSubscribe...MonoSink
getPerson线程名称:reactor-http-nio-4
getPerson(): Person{id=1, name='person1'}
2. data:Person{id=1, name='person1'}
3. onSuccess:Person{id=1, name='person1'}
案例2:http://127.0.0.1:8080/person/xxoo?name=Tom
控制台输出:
In get01, name = Tom
In get01, serverHttpRequest = org.springframework.http.server.reactive.ReactorServerHttpRequest@77c0f877
In get01, headers = [Host:"127.0.0.1:8080", Connection:"keep-alive", Cache-Control:"max-age=0", Upgrade-Insecure-Requests:"1", User-Agent:"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36", Accept:"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", Sec-Fetch-Site:"none", Sec-Fetch-Mode:"navigate", Sec-Fetch-User:"?1", Sec-Fetch-Dest:"document", Accept-Encoding:"gzip, deflate, br", Accept-Language:"zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,fr;q=0.6", Cookie:"remember_token=11|5ab3b64f1bb566f8155003f396cc0361c8957adb4267161b275bceb5f174ea53015491cbd90f06a5aa38606455a717790a60efe719fa1fbd314a6905027ca6f8; SESSION=7b53f3e5-6059-42fe-a4b6-9dac524364d7"]
In get01, parameters = [Tom]
案例3:http://127.0.0.1:8080/person/sse
实现了 WebFlux + SSE 服务器推,不用刷新页面,也能实时拿到数据,原理类似于浏览器下载文件。
下面这9条数据,不是一次加载出来的,是逐个输出的。
控制台输出:
1. sub is: reactor.core.publisher.FluxIterable$IterableSubscription@4ec8ff31
3. data is: haha, 1
3. data is: haha, 2
3. data is: haha, 3
3. data is: haha, 4
3. data is: haha, 5
3. data is: haha, 6
3. data is: haha, 7
3. data is: haha, 8
3. data is: haha, 9
doOnComplete,全部完成了!这里传一个新线程
附:代码
目录结构
PersonController.java(重点)
package com.mashibing.admin.controller;
import com.mashibing.admin.service.PersonService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.WebSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Random;
import java.util.stream.IntStream;
// 使用WebFlux,针对IO密集度比较高的系统,性能会有提升。
@RestController
@RequestMapping("/person")
public class PersonController {
@Autowired
PersonService personService;
@GetMapping("")
Mono<Object> get() {
System.out.println("=========here get=====");
Mono<Object> mono = Mono.create(sink -> {//组装数据序列
sink.success(personService.getPerson());
}).doOnSubscribe(sub -> {//订阅数据
System.out.println("1. doOnSubscribe..." + sub);
}).doOnNext(data -> {//得到数据
System.out.println("2. data:" + data);
}).doOnSuccess(onSuccess -> {//整体完成
System.out.println("3. onSuccess:" + onSuccess);
});
System.out.println("before return, mono: " + mono);
// 得到一个包装的数据序列,return给了容器
// 容器拿到这个序列,再去执行序列里的方法
// 这和 ajax 很像
// 1. 写回调接口,让b调用
// 2. 将方法传过去,看起来像是异步,实质上,阻塞过程在容器内部
// 并不是提高效率,只是将阻塞延后
System.out.println("return方法线程名称:" + Thread.currentThread().getName());
return mono; // 组织数据的过程,是netty容器做的,获取数据的过程不依赖controller了
}
@GetMapping("xxoo")
//serverHttpRequest是webFlux特有的,用法也不一样
//拓展思维,SpringCloud Gateway函数式,比zuul的性能高,底层是基于netty的
Mono<Object> get01(String name, ServerHttpRequest serverHttpRequest, WebSession webSession) {
System.out.println("In get01, name = " + name);
System.out.println("In get01, serverHttpRequest = " + serverHttpRequest);
//org.springframework.http.server.reactive.ReactorServerHttpRequest
System.out.println("In get01, headers = " + serverHttpRequest.getHeaders());
// [Host:"127.0.0.1:8080", User-Agent:"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0", ...
System.out.println("In get01, parameters = " + serverHttpRequest.getQueryParams().get("name"));
//[gongluyang]
// session 的使用方法
if (StringUtils.isEmpty(webSession.getAttribute("code"))) {
System.out.println("第一次请求,我要set session 了");
webSession.getAttributes().put("code", 111222333);
}
return Mono.just("me me da!");
}
// 不引入 spring-webmvc,仅使用 netty 实现
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> sse() {
Flux<String> flux = Flux.fromStream(IntStream.range(1, 10).mapToObj(i -> {// map 是映射,理解为遍历,但是不是遍历
try {
Thread.sleep(new Random().nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "haha, " + i;
})).doOnSubscribe(sub -> {
System.out.println("1. sub is: " + sub);// reactor.core.publisher.FluxIterable$IterableSubscription
}).doOnComplete(() -> {
System.out.println("doOnComplete,全部完成了!这里传一个新线程");
}).doOnNext(data -> {
System.out.println("3. data is: " + data);
});
return flux;
}
}
Person.java
package com.mashibing.admin.pojo;
public class Person {
int id;
String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
PersonService.java
package com.mashibing.admin.service;
import com.mashibing.admin.pojo.Person;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class PersonService {
static ConcurrentHashMap<Integer, Person> map = new ConcurrentHashMap();
static {
for (int i = 0; i < 100; i++) {
Person person = new Person();
person.setId(i);
person.setName("person" + i);
map.put(i, person);
}
}
public Person getPerson() {
try {
System.out.println("getPerson线程名称:" + Thread.currentThread().getName());
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("getPerson(): " + map.get(1));
return map.get(1);
}
public Flux<Person> getPersons() {
// 需要让数据自己变成响应式的,我们关注的是前后端的响应式
Flux<Person> personFlux = Flux.fromIterable(map.values());
return personFlux;
}
}
TestReactiveApplication.java
package com.mashibing.admin;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TestReactiveApplication {
public static void main(String[] args) {
SpringApplication.run(TestReactiveApplication.class, args);
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>TeatReactive</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- 注释掉这个,启动就默认用的 netty 而不是 tomcat 了-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-web</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
更多推荐
已为社区贡献27条内容
所有评论(0)