pom.xml配置

io.projectreactor

reactor-core

3.3.10.RELEASE

实例

package top.senseiliu;

import org.reactivestreams.Subscriber;

import org.reactivestreams.Subscription;

import reactor.core.Disposable;

import reactor.core.publisher.Flux;

import reactor.core.publisher.Mono;

import reactor.core.scheduler.Schedulers;

import java.time.Duration;

import java.util.Arrays;

import java.util.Comparator;

import java.util.List;

import java.util.Map;

import java.util.Random;

import java.util.concurrent.CompletableFuture;

public class ProjectReactorTest {

public static void main(String[] args) throws InterruptedException {

System.out.println("=================================== Flux =======================================");

// 整型

Flux integerFlux = Flux.just(1, 2, 3, 4, 5);

// 字符串

Flux stringFlux = Flux.just("hello", "world");

// 从list创建

List list = Arrays.asList("hello", "world");

Flux stringFlux1 = Flux.fromIterable(list);

// 范围

Flux integerFlux1 = Flux.range(1, 5);

// 每1秒产生一个数据

Flux longFlux = Flux.interval(Duration.ofMillis(1000));

longFlux.take(10).subscribe(System.out::println);

// 从Flux创建

Flux stringFlux2 = Flux.from(stringFlux1);

stringFlux2.subscribe(System.out::println);

System.out.println("=================================== Mono =======================================");

// 字符串

Mono stringMono = Mono.just("Hello World");

// Callable创建

Mono stringMono1 = Mono.fromCallable(() -> "Hello World");

// Future创建

Mono stringMono2 = Mono.fromFuture(CompletableFuture.completedFuture("Hello World"));

// Suppier创建

Random random = new Random();

Mono doubleMono = Mono.fromSupplier(random::nextDouble);

//Mono创建

Mono doubleMono1 = Mono.from(doubleMono);

// Flux创建,mono只能拿到一个数据

Mono integerMono = Mono.from(Flux.range(1, 5));

integerMono.subscribe(System.out::println);

stringMono2.subscribe(System.out::println);

System.out.println("=================================== subscribe订阅 =======================================");

// 订阅方式一

stringFlux = Flux.just("Hello", "World");

stringFlux.subscribe(val ->{

System.out.println("val:" + val);

}, error ->{

System.out.println("error:" + error);

}, () ->{

System.out.println("Finished");

}, subscription -> {

subscription.request(1);

});

// 订阅方式二

stringFlux.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription subscription) {

subscription.request(Long.MAX_VALUE);

}

@Override

public void onNext(String s) {

System.out.println("onNext:" + s);

}

@Override

public void onError(Throwable throwable) {

}

@Override

public void onComplete() {

System.out.println("onComplete");

}

});

System.out.println("=================================== map映射 =======================================");

class Employee {

public int id;

public String name;

public double salary;

Employee(int id, String name, double salary) {

this.id = id;

this.name = name;

this.salary = salary;

}

public String getName() {

return name;

}

public double getSalary() {

return salary;

}

}

class Leader {

public String name;

public double salary;

Leader(String name, double salary) {

this.name = name;

this.salary = salary;

}

}

List EmployeeList = Arrays.asList(

new Employee(1, "Alex", 1000),

new Employee(2, "Michael", 2000),

new Employee(3, "Jack", 1500),

new Employee(4, "Owen", 1500),

new Employee(5, "Denny", 2000));

Flux employeeFlux = Flux.fromIterable(EmployeeList);

employeeFlux.map(employee -> {

Leader leader = new Leader(employee.name, employee.salary);

return leader;

}).subscribe(consumer -> System.out.println(consumer.name));

// 加过滤器

employeeFlux.filter(employee -> 2000 == employee.salary)

.map(employee -> {

Leader leader = new Leader(employee.name, employee.salary);

return leader;

}).log().subscribe(consumer -> System.out.println(consumer.name));

System.out.println("=================================== Exception异常 =======================================");

Flux.range(-2, 5)

.map(val -> {

int i = val / val;

return val;

})

//遇到错误继续订阅

.onErrorContinue((ex, val) -> {

if (ex instanceof ArithmeticException) {

System.out.println("ex:" + ex + ", val:" + val);

} else {

}

})

//遇到错误,返回新的Flux。继续订阅

.onErrorResume((ex) -> {

return Flux.range(-2, 5);

})

.subscribe(System.out::println);

System.out.println("=================================== flatMap映射 =======================================");

Flux stringFlux3 = Flux.just("a","b","c","d","e","f","g","h","i");

// 嵌套Flux,2个元素作为一个Flux,存到新的Flux

Flux> stringFlux4 = stringFlux3.window(2);

// flatMap()作用后元素处于单Flux中,可以理解为所有元素平铺

stringFlux4.flatMap(flux1 -> flux1.map(word -> word.toUpperCase()))

.subscribe(System.out::println);

// 从嵌套Flux还原字符串Flux

Flux stringFlux5 = stringFlux4.flatMap(flux1 -> flux1);

// stringFlux1 等于 stringFlux5

stringFlux5.subscribe(System.out::println);

System.out.println("=================================== concatMap有序映射 =======================================");

Flux stringFlux6 = Flux.just("a","b","c","d","e","f","g","h","i");

Flux> stringFlux7 = stringFlux6.window(2);

stringFlux7.concatMap(flux1 ->flux1.map(word ->word.toUpperCase())

.delayElements(Duration.ofMillis(200)))

.subscribe(x -> System.out.println("->"+x));

Thread.sleep(2000);

System.out.println("=================================== collect集合 =======================================");

Flux integerFlux2 = Flux.range(1,5);

// 转换成以List为对象的Mono

Mono> mono = integerFlux2.collectList();

mono.subscribe(System.out::println);

Flux employeeFlux2 = Flux.fromIterable(EmployeeList);

// 转换成以List为对象的Mono,按薪水排序

Mono> mono1 = employeeFlux2.collectSortedList(Comparator.comparing(Employee::getSalary));

mono1.subscribe(consumer -> consumer.stream().forEach(item -> System.out.println(item.name + ":" + item.salary)));

// 转换成以Map为对象的Mono

Mono> mono2 = employeeFlux2.collectMap(item ->item.getName(), item ->item);

mono2.subscribe(System.out::println);

System.out.println("=================================== take获取 =======================================");

//根据数量获取

Flux.range(1,10).take(0).log().subscribe(System.out::println);

//根据实际获取

Flux.range(1,10000).take(Duration.ofMillis(3)).log().subscribe(System.out::println);

//根据条件获取

Flux.range(1,10).takeUntil(item ->item == 5).log().subscribe(System.out::println);

System.out.println("=================================== buffer缓冲 =======================================");

Flux stringFlux8 = Flux.just("a","b","c","d","e","f","g");

stringFlux8.subscribe(x -> System.out.print("->"+x));

System.out.println();

Flux> listFlux = stringFlux8.buffer(2);

listFlux.subscribe(x -> System.out.print("->"+x));

System.out.println();

System.out.println("=================================== backpressure背压 =======================================");

// 每秒产生一个数据

Flux longFlux = Flux.interval(Duration.ofMillis(1));

longFlux.take(10).subscribe(new Subscriber() {

Subscription subscription;

@Override

public void onSubscribe(Subscription subscription) {

this.subscription = subscription;

subscription.request(Long.MAX_VALUE);

}

@Override

public void onNext(Long aLong) {

// 背压,需要三个元素

subscription.request(3);

System.out.println("val:"+aLong);

}

@Override

public void onError(Throwable throwable) {

}

@Override

public void onComplete() {

}

});

Thread.sleep(2000);

System.out.println("=================================== disposable停止Flux流 =======================================");

Flux longFlux2 =Flux.interval(Duration.ofMillis(200));

// take方法准确获取订阅数据量

Disposable disposable = longFlux2.take(50).subscribe(consumer -> System.out.println(consumer));

// 主线程休眠1秒后,彻底停止子线程中正在订阅推送数据的Flux或Mono流

Thread.sleep(1000);

disposable.dispose();

System.out.println("stop");

System.out.println("=================================== parallel多线程 =======================================");

Flux.range(1,10)

.parallel(4)

.runOn(Schedulers.parallel())

.subscribe(consumer -> System.out.print("->" + consumer));

System.out.println();

System.out.println("=================================== merge合并 =======================================");

Flux longFlux11 = Flux.interval(Duration.ofMillis(100)).take(10);

Flux longFlux12 = Flux.interval(Duration.ofMillis(100)).take(10);

Flux longFlux13 = Flux.merge(longFlux11,longFlux12);

longFlux13.subscribe(val -> System.out.print(val));

System.out.println();

Thread.sleep(2000);

Thread.sleep(Long.MAX_VALUE);

}

}

关于Reactive Streams、Srping Reactor 和 Spring Flux(Web Flux)之间的关系?

Reactive Streams 是规范,Reactor 实现了 Reactive Streams。

Web Flux 以 Reactor 为基础,实现 Web 领域的反应式编程框架

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐