SpringBoot 2.x版本提供了 Spring WebFlux 模块,支持了请求的异步调用。

在微服务中,使用服务A调用服务B时,也是可以进行异步调用的,Spring5 封装了WebClient来进行这项操作,这里创建两个项目来模拟项目之间的这种调用。

 

首先,创建SpringBoot项目,这里使用时下最新版本 2.3.1.RELEASE进行说明。我使用的是Maven,首先要添加相应的web组件依赖,如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

这里创建好SpringBoot项目后,创建所需Controller,用来支撑后续的测试操作

@RestController
@RequestMapping("/getName")
class FindNameController(val webClientBuilder: WebClient.Builder) {

    //使用get请求
    @GetMapping("/getName/{word}")
    fun getName(@PathVariable word: String): Mono<String> {
        //得到进入方法的时间
        val start = System.currentTimeMillis()
        //使用webClient进行服务间的异步调用
        return webClientBuilder
                .build()
                .method(HttpMethod.POST)
                //指定要访问项目的Url
                .uri("http://localhost:1234/timeout/getName")
                //指定要传输的参数
                .bodyValue(word)
                //进行http调用
                .retrieve()
                //结果使用Mono进行接收
                .bodyToMono(String::class.java)
                //如果出错进行错误打印处理
                .onErrorResume {
                    it.printStackTrace()
                    //可以查看是否真的超时
                    println("错误时长---" + (System.currentTimeMillis() - start) + "----" + Thread.currentThread().name)
                    Mono.just("错了" + Thread.currentThread().name)
                }


    }
}

在上述代码中,WebClient.Builder是使用配置文件定义的Bean,我们在WebClientConfig 类中进行配置,具体代码如下所示

@Component
class WebClientConfig {


    @Bean
    fun webClientBuilder(): WebClient.Builder {
        return WebClient.builder()

    }
}

这样可以把WebClient交由Spring管理,方面统一进行配置处理,这里后后续的超时时间配置就在这里进行。

以上,是为项目A。

项目B我们简单创建,只需要指定相应端口后,因为我是在同一台机器上测试,所以A,B项目的端口要需要进行区分。

项目B和项目A的创建没什么不同,这里我只创建一个Controller供A进行调用。

@RestController
@RequestMapping("/timeout")
class TimeoutController {

    @PostMapping("/getName")
    fun getName(@RequestBody word: String): Mono<String> {
        return Mono.just(word)
                .map { "$it---name is Bird" }

    }
}

我们可以在浏览器上进行直接输入地址调用。结果如下:

下边重点到了,我们在项目B的Controller加上Thread.sleep(10000),让线程睡10秒钟

@RestController
@RequestMapping("/timeout")
class TimeoutController {

    @PostMapping("/getName")
    fun getName(@RequestBody word: String): Mono<String> {
        Thread.sleep(10000)
        return Mono.just(word)
                .map { "$it---name is Bird" }

    }
}

再次调用,等了10秒结果才出来。假设10秒时间太长了,我们调用最多会等5秒钟,过了5秒钟就放弃调用该怎么做呢,我们来修改一下WebClientConfig 类。

改写后的类如下所示:

@Component
class WebClientConfig {

    private val connectionProvider = ConnectionProvider.builder("tcp-client-pool")
            //允许的最大连接数
            .maxConnections(3)
            //没有连接可用时,请求等待的最长时间
            .pendingAcquireTimeout(Duration.ofMillis(60000))
            //没有连接时,最多有多少个请求等待
            .pendingAcquireMaxCount(18)
            .build()

    //自定义一个loop来进行http线程调用管理
    private val loopResources = LoopResources.create("tcp-client-loop")

    val theTcpClient = TcpClient
            .create(connectionProvider)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000)
            .runOn(loopResources)
            .doOnConnected {
                //设置读取超时时间
                it.addHandlerLast(ReadTimeoutHandler(5))
                //设置写入超时时间
                it.addHandlerLast(WriteTimeoutHandler(5))
            }


    @Bean
    fun webClientBuilder(): WebClient.Builder {
        return WebClient.builder().clientConnector(ReactorClientHttpConnector(HttpClient.from(theTcpClient)))

    }
}

如上,使用指定的HttpClient取代了默认的,并设置了读取时间和写入时间都是5秒。我们再次重启进行调用。程序打印出如下错误

因为B项目中需要10秒钟才可以把事务处理完毕,这边5秒钟后直接放弃等待,抛出请求超时。

下边我们再对

private val connectionProvider = ConnectionProvider.builder("tcp-client-pool")
            //允许的最大连接数
            .maxConnections(3)
            //没有连接可用时,请求等待的最长时间
            .pendingAcquireTimeout(Duration.ofMillis(60000))
            //没有连接时,最多有多少个请求等待
            .pendingAcquireMaxCount(2)
            .build()

进行测试,假设pendingAcquireMaxCount改为2,maxConnections为3(超时时间这里改为15秒, 大于项目B的10秒,使调用可以成功)

我们启用10个线程进行请求,按照配置,3个会直接获取到连接池分配的连接。因为最多的只允许存在3个连接,所以剩下的7个请求会被暂时搁置,因为最多请求等待个数为2个,所以剩下7个请求有两个放入到了请求等待队列,而剩下的5个会直接失败。

为了验证想法,这里使用jmeter进行线程模拟调用。

根据错误信息可知,等待队列最多有2个,多于的会抛出异常。jmeter信息如下,因为程序中catch住了异常,所以请求是成功的,但是从返回信息可以看出来,前5个请求是因为请求到达上限,直接返回了错误的提示结果。

好了,这里先说到这里。这里具体的实现源码,后边我们再说。这里的参数需要根据自己的系统进行选择配置,并不是越大越好,合适才是最重要的。 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐