Spring Webflux - 01 MVC的困境
Filter/Servlet在生成响应之前可能要等待一些资源的响应以完成请求处理,比如一个jdbc查询,或者远程服务rpc调用。Servlet 3.0引入了异步处理请求的能力,使得线程可以不用阻塞等待,提早返回到容器,从而执行更多的任务请求。虽然我们可以扩大线程数量,但线程是要消耗操作系统资源的,也并非越多越好,当然了还有其他很多影响因素。再深入想一下,如果每个线程的执行时间是不可控的,而Tomc
文章目录
Spring MVC的困境
我们先看一段工作中大家常见的代码
@RestController
public class TestAController {
@RequestMapping(value ="resource",method = RequestMethod.GET)
public Object processResult(){
RestTemplate restTemplateew RestTemplate();
∥请求外部资源
String result = restTemplate. getForObject("http://example.com/api/resource2", String.class)
return processResultFurther(result);
}
private String processResultFurther(String result){
return "resource here"
}
Tomcat处理请求,线程状态的变化如下:
我们发现这里的请求和响应事实上 是 同步阻塞。
再深入想一下,如果每个线程的执行时间是不可控的,而Tomcat线程池中的线程数量是有限的…
那该怎么办呢?
Servlet 异步请求缓解线程池压力
我们来算一下:
- TPS : 2000/s
- 请求耗时:250ms
那么在这种情况下:
- tomcat最大线程数配置: 2000/s * 0.25s=500
因此 server. tomcat. threads. max=500 基本能满足需求
那假设 tps 到了 4000 呢?
虽然我们可以扩大线程数量,但线程是要消耗操作系统资源的,也并非越多越好,当然了还有其他很多影响因素。
那怎么办呢?
Servlet 3.0 异步请求处理
Filter/Servlet在生成响应之前可能要等待一些资源的响应以完成请求处理,比如一个jdbc查询,或者远程服务rpc调用。
在Servlet阻塞等待是一个低效的操作,这将导致受限系统资源急剧紧张,比如线程数、连接数等等
Servlet 3.0引入了异步处理请求的能力,使得线程可以不用阻塞等待,提早返回到容器,从而执行更多的任务请求。把耗时的任务提交给另一个异步线程去执行,以及产生响应。
Code 演示
工程
pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.artisan</groupId>
<artifactId>servlet-asyn</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
</dependencies>
</project>
配置文件
server.tomcat.threads.max=1
启动类
package com.artisan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
/**
* @author 小工匠
* @version 1.0
* @description: 启动类
* @date 2022/10/6 12:03
* @mark: show me the code , change the world
*/
@SpringBootApplication
// 使用@ServletComponentScan注解后,Servlet、Filter、Listener可以直接通过@WebServlet、@WebFilter、@WebListener注解自动注册,无需其他代码
@ServletComponentScan
public class ServletAsyncApplication {
public static void main(String[] args) {
SpringApplication.run(ServletAsyncApplication.class,args);
}
}
同步servlet
package com.artisan.servlet;
import lombok.extern.slf4j.Slf4j;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* @version 1.0
* @description: 同步Servlet请求
* @date 2022/10/6 12:06
* @mark: show me the code , change the world
*/
@WebServlet(value = "/sync")
@Slf4j
public class SyncServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
log.info("request: {}, currentThread: {}", req.getQueryString(), Thread.currentThread().getName());
processFuture(req, resp);
}
private void processFuture(HttpServletRequest req, HttpServletResponse resp) {
try {
TimeUnit.SECONDS.sleep(10);
resp.getWriter().println("sync handler");
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
演示
异步servlet
package com.artisan.servlet;
import com.artisan.handler.AsyncRequestWrapper;
import com.artisan.handler.AsyncServletRejectedHandler;
import com.artisan.handler.AsyncThreadFactory;
import lombok.extern.slf4j.Slf4j;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* @version 1.0
* @description: 异步Servlet请求
* @date 2022/10/6 15:23
* @mark: show me the code , change the world
*/
@WebServlet(value = "/async", asyncSupported = true)
@Slf4j
public class AsyncServlet extends HttpServlet {
private ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(1),
AsyncThreadFactory.builder().threadName("async-thread-pool").build(),
new AsyncServletRejectedHandler());
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
log.info("request: {}, currentThread: {}", req.getQueryString(), Thread.currentThread().getName());
AsyncContext asyncContext = req.startAsync();
AsyncRequestWrapper wrapper = AsyncRequestWrapper.builder().asyncContext(asyncContext)
.servletRequest(req)
.servletResponse(resp)
.thread(Thread.currentThread())
.build();
executor.execute(wrapper);
}
}
辅助Code
【AsyncThreadFactory 】
package com.artisan.handler;
import lombok.Builder;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author 小工匠
* @version 1.0
* @description: 线程池工厂
* @date 2022/10/6 15:35
* @mark: show me the code , change the world
*/
@Builder
public class AsyncThreadFactory implements ThreadFactory {
private final ThreadFactory threadFactory = Executors.defaultThreadFactory();
private String threadName;
private final AtomicInteger atomicInteger = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = threadFactory.newThread(r);
thread.setName(this.threadName + "-" + atomicInteger.getAndIncrement());
return thread;
}
}
【AsyncRequestWrapper 】
package com.artisan.handler;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import javax.servlet.AsyncContext;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* @version 1.0
* @description: 线程包装对象
* @date 2022/10/6 15:42
* @mark: show me the code , change the world
*/
@Slf4j
@Data
@Builder
public class AsyncRequestWrapper implements Runnable {
private AsyncContext asyncContext;
private ServletRequest servletRequest;
private ServletResponse servletResponse;
private Thread thread;
@Override
public void run() {
processFuture(asyncContext, servletRequest, servletResponse);
}
private void processFuture(AsyncContext asyncContext, ServletRequest servletRequest, ServletResponse servletResponse) {
HttpServletRequest httpServletRequest = (HttpServletRequest)servletRequest;
try {
TimeUnit.SECONDS.sleep(10);
log.info("processFuture 当前处理线程 {}", Thread.currentThread().getName());
servletResponse.getWriter().println("async handler -->" + httpServletRequest.getQueryString());
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
// 完成
asyncContext.complete();
}
}
【AsyncServletRejectedHandler】
package com.artisan.handler;
import lombok.extern.slf4j.Slf4j;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author 小工匠
* @version 1.0
* @description: 拒绝策略
* @date 2022/10/6 15:24
* @mark: show me the code , change the world
*/
@Slf4j
public class AsyncServletRejectedHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
AsyncRequestWrapper wrapper = (AsyncRequestWrapper) r ;
HttpServletRequest httpServletRequest = (HttpServletRequest) wrapper.getServletRequest();
try {
String queryString = httpServletRequest.getQueryString();
String threadName = wrapper.getThread().getName();
log.info("当前线程:{} , 当前线程请求参数:{}", threadName, queryString);
wrapper.getServletResponse().getWriter().println("too many request , current thread:" + threadName + " , current param:" + queryString );
} catch (IOException e) {
throw new RuntimeException(e);
}
// 别忘了 complete
wrapper.getAsyncContext().complete();
}
}
演示
2022-10-06 21:30:09.700 INFO 7860 --- [nio-8080-exec-1] com.artisan.servlet.AsyncServlet : request: i=1, currentThread: http-nio-8080-exec-1
2022-10-06 21:30:11.277 INFO 7860 --- [nio-8080-exec-1] com.artisan.servlet.AsyncServlet : request: i=2, currentThread: http-nio-8080-exec-1
2022-10-06 21:30:12.813 INFO 7860 --- [nio-8080-exec-1] com.artisan.servlet.AsyncServlet : request: i=3, currentThread: http-nio-8080-exec-1
2022-10-06 21:30:12.813 INFO 7860 --- [nio-8080-exec-1] c.a.handler.AsyncServletRejectedHandler : 当前线程:http-nio-8080-exec-1 , 当前线程请求参数:i=3
2022-10-06 21:30:15.355 INFO 7860 --- [nio-8080-exec-1] com.artisan.servlet.AsyncServlet : request: i=4, currentThread: http-nio-8080-exec-1
2022-10-06 21:30:15.355 INFO 7860 --- [nio-8080-exec-1] c.a.handler.AsyncServletRejectedHandler : 当前线程:http-nio-8080-exec-1 , 当前线程请求参数:i=4
2022-10-06 21:30:19.712 INFO 7860 --- [c-thread-pool-1] com.artisan.handler.AsyncRequestWrapper : processFuture 当前处理线程 async-thread-pool-1
2022-10-06 21:30:29.719 INFO 7860 --- [c-thread-pool-1] com.artisan.handler.AsyncRequestWrapper : processFuture 当前处理线程 async-thread-pool-1
Tomcat 请求处理流程以及异步请求工作原理
更多推荐
所有评论(0)