动态代理模式-在项目中的使用(基于feign源码写自己的rpc报表框架)
设计模式-总览https://mp.csdn.net/mp_blog/creation/editor/1222025071、项目背景报表服务导出服务经常会涉及jvm、io等异常情况,所以想要将报表的查询和导出在两个服务中进行,并且报表的查询是在k8s环境中的一个微服务,而报表的真正执行服务是在一个虚拟机。需要将报表的查询和导出两个阶段在两个...
设计模式-总览https://mp.csdn.net/mp_blog/creation/editor/122202507
目录
3、@EnableReportClient启动流程和@FeignClient代理执行流程分析
1、项目背景
报表导出服务经常会涉及jvm、io等异常情况,所以想要将报表的查询和导出在两个服务中进行,并且报表的查询是在k8s环境中的一个微服务,而报表的真正执行服务是在一个虚拟机(挂了不影响其他业务)。项目情况分析:
1)、需要将报表的查询和导出两个阶段在两个分别在两个环境中执行,所以不能使用注册中心查询当前列表。在spring boot yml配置中写死需要调用的服务列表。
2)、当当前只是开发报表的调用框架,具体开发人员时rpc参数可能是查询的参数,或者查询的结果,或者sql都是可能的参数不确定。所以需要开发类型Feign一样的框架,传递不确定个数和类型的参数。
3)、基于报表业务的深度开发,使用了动态代理在调用前直接将部分公用的报表业务耦合进流程中。并且微服务的报表查询服务,底层直接使用了 Feign装配的 Apache Httpclient线程池。
设计的流程图如上,基于这样的设计与自己的部分知识结构有关:动态代码方面的基础知识、自己也梳理过Spring Aop的源码(Aop的本质就是动态代理),Feign的源码实现。
2、实现大致流程和代码
1)、启动报表组件
基于该框架,首先需要在Spring boot的启动类上添加启用报表组件的注解,并使用参数basePackages配置需要扫描@ReportClient注解的包路径,如下:
@EnableReportExecute(basePackages = "com.kevin")
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(ReportExecuteRegistrar.class)
public @interface EnableReportExecute {
/**
* 指定基本扫描包
* @return 指定数组的 基准包
*/
String[] basePackages() default {};
}
2)、编写自己的ReportClient
开发人员只在需要执行异步报表的controller处理完业务逻辑后,调用自己的报表接口服务,并且接口可以允许同步或者异步io调用,可以在接口类层或者接口层 设置负载均衡策略,设置rpc调用的超时时间。如下:
@ReportExecute(path = "/provider/v1/demoSku")
public interface DemoSkuService {
@ReportMapping("executeData")
void executeData(@Nullable String title);
@ReportMapping("executeData2")
void executeData2(@Nullable String title, String sql);
}
对应的@ExportClient、@ReportMapping类信息如下:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ReportExecute {
/**
* 注入Spring Ioc容器的bean 名称
* @return bean名称
*/
String beanName() default "";
/**
* 执行策略,为每个的服务定制负载均衡策略
* @see LbsStrategyEnum#strategy
*/
String lbsStrategy() default "polling";
/**
* 指定的路径
* @return 统一前置路径,对所有方法生效
*/
String path() default "/";
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@SuppressWarnings("unused")
public @interface ReportMapping {
/**
* 访问子路径
* @return bean名称
*/
String value() default "";
/**
* 默认 POST请求
* @return 请求方式
*/
RequestMethod requestMethod() default RequestMethod.POST;
/**
* 同步或异步, 默认异步
* @return bean名称
*/
SyncTag syncTag() default SyncTag.ASYNC;
/**
* 超时时间
* @return 超时时间
*/
int timeOut() default 0;
/**
* 超时单位
* @return 超时单位
*/
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
/**
* 同步标识符
*/
enum SyncTag {
SYNC, ASYNC
}
/**
* Http 访问方式
*/
enum RequestMethod {
POST, GET
}
}
3)、ReportClient执行动态代理业务
@ReportClient接口执行的过程就是动态代理的过程,会调用报表服务添加一条数据,并且将报表的id使用http header的方式传递给报表执行服务。
public class ReportInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object o, Method method, Object[] objects) {
ReportContext reportContext = BeanTools.getBean(ReportContext.class);
ReportExecuteFactoryBean base = reportContext.getBase(method);
String path = base.getPath();
// 1、保存状态
Object titleObj = objects[0];
String title = titleObj == null ? "" : (String)titleObj;
String id = saveRecord(title);
// 2、调用报表执行服务
Object[] param = Arrays.copyOfRange(objects, 1, objects.length);
String httpPrefix = base.getLbsStrategy() == null ? ReportExecuteLbs.getHttp() : ReportExecuteLbs.getHttp(base.getLbsStrategy());
ReportMapping postMapping = reportContext.getRequestMapping(method);
String mapping = postMapping.value().startsWith("/") ? postMapping.value() : ("/" + postMapping.value());
String pathPrefix = path.startsWith("/") || "".equals(path) ? path : ("/" + path);
// return ReportHttpUtil.doHttp(id, title, postMapping, httpPrefix + pathPrefix + mapping, param);
return SemaphoreLimiter.getInstance().execute(() ->
ReportHttpUtil.doHttp(id, title, postMapping, httpPrefix + pathPrefix + mapping, param));
}
/**
* 保存记录信息
* @return 主键Id
*/
private String saveRecord(String title) {
String id = UUID.randomUUID().toString();
ReportRecord record = new ReportRecord();
record.setId(id);
record.setTitle(title);
record.setState(ReportStateEnum.INIT.state);
record.setUrl("");
record.setObjectVersionNumber(0L);
// 从 单点登录中获取用户信息
String createBy = "kevin";
record.setCreateBy(createBy);
record.setLastUpdateBy(createBy);
BeanTools.getBean(ReportRecordService.class).insert(record);
return id;
}
}
并且这里为了服务的健壮性,使用了jdk current包Semaphore是实现的本地限流器,SemaphoreLimiter代码如下:
@Slf4j
public class SemaphoreLimiter<V> {
/**
* 限流器
*/
private static final Semaphore SEMAPHORE = new Semaphore(15);
private static final SemaphoreLimiter INSTANCE = new SemaphoreLimiter();
/**
* 最大的执行等待时间,单位毫秒
*/
private static final int MAX_WAIT_TIME = 5000;
/**
* 单例构造私有化
*/
private SemaphoreLimiter() {
}
/**
* 共有的实例初始化
* @return 单例对象
*/
public static SemaphoreLimiter getInstance() {
return INSTANCE;
}
/**
* 在限流中执行服务
* @param callable 单纯的任务回调信息
* @return 远程方法调用结果
*/
public V execute(Callable<V> callable) {
try {
SEMAPHORE.tryAcquire(MAX_WAIT_TIME, TimeUnit.MILLISECONDS);
return callable.call();
} catch (InterruptedException e) {
log.info("使用限流器,发生中断异常!");
buildException(LIMITER_EXCEPTION);
} catch (Exception e) {
log.error("异常信息 = " + e);
buildException(LIMITER_EXCEPTION);
} finally {
SEMAPHORE.release();
}
return null;
}
}
4)、报表执行服务
首先报表执行服务会使用拦截器,将http header中的 reportId或者并且存放到 ThreadLocal中,如下:
public class ReportInterceptor implements HandlerInterceptor {
/**
* 存放当前请求的回调主键和url信息
*/
public static final ThreadLocal<ReportBO> THREAD_LOCAL = new ThreadLocal<>();
/**
* 保存导出的报表地址
* @param url 报表访问地址
*/
public static void saveUrl(String url) {
ReportBO reportBO = THREAD_LOCAL.get();
if (reportBO != null) {
reportBO.setUrl(url);
}
}
/**
* 获取当前的报表标题
* @return 报表标题
*/
public static String getTitle() {
ReportBO reportBO = THREAD_LOCAL.get();
if (reportBO == null) {
return "";
}
return reportBO.getTitle();
}
@Override
public boolean preHandle(HttpServletRequest request, @NonNull HttpServletResponse response, @NonNull Object handler) throws UnsupportedEncodingException {
String reportBatchId = request.getHeader("reportBatchId");
if (reportBatchId != null) {
String reportTitle = request.getHeader("reportTitle");
reportTitle = ObjectUtils.isEmpty(reportTitle) ? "" : URLDecoder.decode(reportTitle, "UTF-8");
THREAD_LOCAL.set(new ReportBO(reportBatchId, reportTitle));
}
return true;
}
@Override
public void postHandle(@NonNull HttpServletRequest request, @NonNull HttpServletResponse response, @NonNull Object handler, ModelAndView modelAndView) {
}
@Override
public void afterCompletion(@NonNull HttpServletRequest request, @NonNull HttpServletResponse response, @NonNull Object handler, Exception ex) {
try {
ReportBO reportBO = THREAD_LOCAL.get();
if (reportBO != null && !ObjectUtils.isEmpty(reportBO.getUrl())) {
// 自动回调报表服务,修改状态,添加访问地址
ReportHttpUtil.doHttp();
}
} finally {
THREAD_LOCAL.remove();
}
}
}
在调用到报表导出服务的controller,并且处理完业务逻辑后,直接调用工具类将文件存储到固定的位置(可以是共享磁盘、fastdfs等),此时工具类本身还会将存储完成的文件存放url放置到ThreadLocal中,为了方便就直接使用了上面的ReportInterceptor的ThreadLocal,并且提供了saveUrl方法。流程核心代码就是下面的:
ReportInterceptor.saveUrl(url);
@Slf4j
@Component
public class ReportConsistentUtil implements InitializingBean {
public static final ThreadLocal<SimpleDateFormat> DATETIME_FORMAT = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss"));
/** Windows系统的基本面路径 */
@Value("${report.base.dir.windows}")
private static String windowsBaseDirectory = "D:\\report\\execute\\";
/** Linux操作系统的基本路径 */
@Value("${report.base.dir.linux}")
private static String linuxBaseDirectory = "/usr/local/execute";
/** 当前操作系统对应的基本路径 */
private static String BASE_DIRECTORY;
/** 当前的操作系统 */
private static final String os = System.getProperty("os.name").toLowerCase();
public static void main(String[] args) throws IOException {
/*File kevin = new File("Z:/report/kevin.txt");
log.info("save file: file path = {}, file name = {}", kevin.getPath(), kevin.getName());
ReportConsistentUtil.saveByFile(Model.Order, kevin);*/
String path = ReportConsistentUtil.saveByFullPath(Model.ORDER, "Z:/report/kevin.txt");
log.info("得到的最终文件路径:{}", path);
}
@Override
public void afterPropertiesSet() throws Exception {
initBaseDir();
}
/**
* 根据路径保持
* @param model 所属模块
* @param fullPath 文件全路径
*/
public static String saveByFullPath(Model model, String fullPath) throws IOException {
if (ObjectUtils.isEmpty(fullPath)) {
throw new RuntimeException("执行的全路径为空!");
}
return saveByFile(model, new File(fullPath));
}
/**
* 保存文件
* @param model 所属模块
* @param file 文件
* @throws IOException io异常
*/
public static String saveByFile(Model model, File file) throws IOException {
FileInputStream fis = new FileInputStream(file);
String suffix = file.getName().substring(file.getName().lastIndexOf("."));
return saveByOutputStream(model, fis, suffix);
}
/**
* 将文件输入流写到指定位置
* @param model 所属模块
* @param fis 文件输入流
* @throws IOException io异常
*/
public static String saveByOutputStream(Model model, FileInputStream fis, String suffix) throws IOException {
/*第四步:将得到的文件名称的扩展名改为.jad*/
String sub = DATETIME_FORMAT.get().format(new Date());
String title = ReportInterceptor.getTitle();
String fileName = ObjectUtils.isEmpty(title) ? sub : title + "-" + sub;
String url = File.separator + model.directory + File.separator + fileName + suffix;
File file = new File(BASE_DIRECTORY + url);
FileOutputStream fos = new FileOutputStream(file);
copy(fis, fos);
fis.close();
fos.close();
// 回执请求的url
ReportInterceptor.saveUrl(url);
return file.getPath();
}
/**
* 用户获取报表
* @param url 报表记录中的url【Model + UUID】
* @return 文件输出流
*/
public static File getExport(String url) {
return new File(BASE_DIRECTORY + File.separator + url);
/*try {
return file;
} catch (FileNotFoundException e) {
return null;
}*/
}
static {
// initBaseDir();
}
/**
* 初始化跟目录,如果不存在
*/
private void initBaseDir() {
File file;
if (OperateSystem.LINUX.system.toLowerCase().equals(os)) {
file = new File(linuxBaseDirectory);
BASE_DIRECTORY = linuxBaseDirectory;
} else if (os.contains(WINDOWS.system.toLowerCase())) {
file = new File(windowsBaseDirectory);
BASE_DIRECTORY = windowsBaseDirectory;
} else {
throw new RuntimeException("不支持的操作系统");
}
if (!file.exists()) {
boolean mkdirs = file.mkdirs();
if (mkdirs) {
log.info("创建目录:{} 成功!", file.getPath());
}
}
for (Model model : Model.values()) {
File childFile = new File(file.getPath() + File.separator + model.directory);
if (!childFile.exists()) {
boolean mkdirs = childFile.mkdirs();
if (mkdirs) {
log.info("创建目录:{} 成功!", childFile.getPath());
}
}
}
}
/**
* 流拷贝
* @param in 输入流
* @param out 输出流
* @throws IOException io异常
*/
private static void copy(InputStream in, OutputStream out) throws IOException {
byte[] buf = new byte[1024];
int len;
/*读取文件内容并写入文件字节流中*/
while((len = in.read(buf))!=-1) {
out.write(buf, 0, len);
}
}
/**
* 报表类型
* @author kevin
* @date 2020/10/28 16:10
* @since 1.0.0
*/
public enum Model {
ORDER("order", "订单报表"),
;
/**
* 目录
*/
public String directory;
/**
* 描述
*/
public String description;
Model(String directory, String description) {
this.directory = directory;
this.description = description;
}
}
/**
* 操作系统
* @author kevin
* @date 2020/10/28 16:14
* @since 1.0.0
*/
public enum OperateSystem {
/** Linux系统 */
WINDOWS("Windows"),
/** Windows系统 */
LINUX("Linux"),
;
public String system;
OperateSystem(String system) {
this.system = system;
}
}
}
最后,ReportInterceptor拦截器的afterCompletion方法,会判断ThreadLocal中是否存在保存导出文件后的url地址,如果存在则根据 reportId和url 回调用报表服务的修改url接口,以添加下载地址并且将报表状态修改为可下载。
@Override
public void afterCompletion(@NonNull HttpServletRequest request, @NonNull HttpServletResponse response, @NonNull Object handler, Exception ex) {
try {
ReportBO reportBO = THREAD_LOCAL.get();
if (reportBO != null && !ObjectUtils.isEmpty(reportBO.getUrl())) {
// 自动回调报表服务,修改状态,添加访问地址
ReportHttpUtil.doHttp();
}
} finally {
THREAD_LOCAL.remove();
}
}
@Slf4j
@Component
public class ReportHttpUtil implements InitializingBean {
private static final String CHARSET = "UTF-8";
private static final String CONTENT_TYPE = "application/json;charset=utf8";
/**
* {@link CloseableHttpClient}
*/
public ReportHttpUtil() { }
@Override
public void afterPropertiesSet() {
// log.info("feign httpclient Class = {}", httpClient.getClass());
}
/**
* 发送同的Post请求
*/
public static void doHttp() {
ReportBO reportBO = ReportInterceptor.THREAD_LOCAL.get();
// 创建Post请求
HttpPost httpPost = new HttpPost(ReportExecuteLbs.getFullHttp());
ReportRecord reportRecord = new ReportRecord();
reportRecord.setId(reportBO.getBatchId());
reportRecord.setUrl(reportBO.getUrl());
// 我这里利用阿里的fastjson,将Object转换为json字符串;
// (需要导入com.alibaba.fastjson.JSON包)
String jsonString = JSON.toJSONString(reportRecord);
StringEntity entity = new StringEntity(jsonString, CHARSET);
// post请求是将参数放在请求体里面传过去的;这里将entity放入post请求体中
httpPost.setEntity(entity);
httpPost.setHeader("Content-Type", CONTENT_TYPE);
// 响应模型
CloseableHttpResponse response = null;
try {
// 由客户端执行(发送)Post请求
CloseableHttpClient httpClient = HttpClients.createDefault();
response = httpClient.execute(httpPost);
} catch (ParseException | ClientProtocolException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
// 释放资源
if (response != null) {
response.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
3、@EnableReportClient启动流程和@FeignClient代理执行流程分析
@EnableReportExecute类中使用import的方式注入@Import(ReportExecuteRegistrar.class),就在是ioc容器启动时将包的根目录下面的@ReportClient标注的接口类以ReportExecuteFactoryBean的形式注入成BeanDefinition,那么当IOC获取该类型FactoryBean时,即调用其getObject方法,并且此时返回的已经是jdk代理对象。此时会将反射的method和注解信息存放到ReportContext中(而其本身就是一个ConcurrentHashMap)。
public class ReportExecuteRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
private ResourceLoader resourceLoader;
private Environment environment;
@Override
public void setResourceLoader(@NonNull ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}
@Override
public void setEnvironment(@NonNull Environment environment) {
this.environment = environment;
}
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, @NonNull BeanDefinitionRegistry registry) {
Map<String, Object> defaultAttrs = metadata.getAnnotationAttributes(EnableReportExecute.class.getName(), true);
if (defaultAttrs == null) {
return;
}
// 注册指定扫描的包
if (defaultAttrs.containsKey("basePackages")) {
String[] basePackages = (String[]) defaultAttrs.get("basePackages");
AnnotationTypeFilter includeFilter = new AnnotationTypeFilter(ReportExecute.class);
ClassPathScanningCandidateComponentProvider scanner = getScanner();
scanner.setResourceLoader(this.resourceLoader);
scanner.addIncludeFilter(includeFilter);
for (String basePackage : basePackages) {
for (BeanDefinition candidateComponent : scanner.findCandidateComponents(basePackage)) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
Assert.isTrue(annotationMetadata.isInterface(), "@ReportExecute can only be specified on an interface");
Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(ReportExecute.class.getCanonicalName());
assert attributes != null;
String lbsStrategy = getWithDefaultValue(attributes, "lbsStrategy");
String beanName = getWithDefaultValue(attributes, "beanName");
String path = getWithDefaultValue(attributes, "path");
if (ObjectUtils.isEmpty(beanName)) {
beanName = annotationMetadata.getClassName();
}
registerReportExecute(registry, annotationMetadata, beanName, lbsStrategy, path);
}
}
}
}
}
/**
* 获取默认值
* @param attributes 配置的属性
* @param key key
* @return 对应的配置值
*/
private String getWithDefaultValue(Map<String, Object> attributes, String key) {
return attributes.get(key) == null ? "" : (String)attributes.get(key);
}
/**
* 注册Bean 待{@link AbstractApplicationContext#refresh()} finishBeanFactoryInitialization完成该 FactoryBean的getObject调用
* @param registry 注册器,当前即为AbstractApplicationContext
* @param annotationMetadata 注解元信息
* @param beanName bean 名称
* @param lbsStrategy 指定的负载均衡策略
*/
private void registerReportExecute(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata,
String beanName, String lbsStrategy, String path) {
String className = annotationMetadata.getClassName();
String finalBeanName = ObjectUtils.isEmpty(beanName) ? className : beanName;
BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(ReportExecuteFactoryBean.class);
definition.addPropertyValue("type", className);
definition.addPropertyValue("beanName", finalBeanName);
definition.addPropertyValue("lbsStrategy", lbsStrategy);
definition.addPropertyValue("path", path);
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
registry.registerBeanDefinition(finalBeanName, definition.getBeanDefinition());
}
/**
* 初始化类路径的扫描器
* 最后会获取到磁盘包路径下的所有class文件,使用类加载器进行加载
* @return 路径扫描器
*/
protected ClassPathScanningCandidateComponentProvider getScanner() {
return new ClassPathScanningCandidateComponentProvider(false, this.environment) {
@Override
protected boolean isCandidateComponent(@NonNull AnnotatedBeanDefinition beanDefinition) {
boolean isCandidate = false;
if (beanDefinition.getMetadata().isIndependent()) {
if (!beanDefinition.getMetadata().isAnnotation()) {
isCandidate = true;
}
}
return isCandidate;
}
};
}
}
@Data
public final class ReportExecuteFactoryBean implements FactoryBean<Object>, BeanClassLoaderAware {
private ClassLoader classLoader;
/**
* Bean名称
*/
private String beanName;
/**
* 对应{@link ReportExecute} 的类
*/
private Class<?> type;
/**
* 负载均衡策略
*/
private String lbsStrategy;
/**
* 类上配置的前缀访问路径
*/
private String path;
@Override
public Object getObject() {
ReportContext reportContext = BeanTools.getBean(ReportContext.class);
Method[] methods = type.getMethods();
for (Method method : methods) {
ReportMapping annotation = method.getAnnotation(ReportMapping.class);
if (annotation == null) {
throw new RuntimeException("配置异常,当前只支持ReportMapping.class, Class = " + type);
}
reportContext.setRequestMapping(method, annotation);
reportContext.setBase(method, this);
}
// 返回代理放入IOC
return Proxy.newProxyInstance(classLoader, new Class[]{type}, new ReportInvocationHandler());
}
@Override
public void setBeanClassLoader(@NonNull ClassLoader classLoader) {
this.classLoader = classLoader;
}
@Override
public boolean isSingleton() {
return true;
}
@Override
public Class<?> getObjectType() {
return this.type;
}
}
@Component
public class ReportContext extends ConcurrentHashMap<Method, ReportMapping> {
/**
* 父类存放方法配置,该容器配置类配置
*/
private final Map<Method, ReportExecuteFactoryBean> CONFIG_BASE = new ConcurrentHashMap<>();
/**
* 获取{@link ReportMapping} 配置信息
* @param method 方法信息
* @return 配置信息
*/
public ReportMapping getRequestMapping(Method method) {
return super.get(method);
}
public void setRequestMapping(Method method, ReportMapping postMapping) {
super.put(method, postMapping);
}
/**
* 设置配置关系
* @param method 方法
* @param factoryBean 方法所在类的生产工厂
*/
public void setBase(Method method, ReportExecuteFactoryBean factoryBean) {
CONFIG_BASE.put(method, factoryBean);
}
/**
* 获取当前访问的前缀信息
* @param method 方法
* @return 方法所在类的生产工厂
*/
public ReportExecuteFactoryBean getBase(Method method) {
return CONFIG_BASE.get(method);
}
}
其他说明:使用的是Feign的线程池,这里使用了httpClient类型的FeignClient。已经一些负载均衡的类等,如下:
@Slf4j
@Component
@ConditionalOnProperty(value = "feign.httpclient.enabled", matchIfMissing = true)
@AutoConfigureBefore(FeignRibbonClientAutoConfiguration.class) // 在 {@code HttpClientFeignLoadBalancedConfiguration} 之前注入
public class FeignHttpClientConfig {
@Autowired(required = false)
private RegistryBuilder registryBuilder;
private final Timer connectionManagerTimer = new Timer("FeignApacheHttpClientConfiguration.connectionManagerTimer", true);
@Bean
@ConditionalOnMissingBean(HttpClientConnectionManager.class)
public HttpClientConnectionManager connectionManager(
ApacheHttpClientConnectionManagerFactory connectionManagerFactory,
FeignHttpClientProperties httpClientProperties) {
final HttpClientConnectionManager connectionManager = connectionManagerFactory
.newConnectionManager(httpClientProperties.isDisableSslValidation(),
httpClientProperties.getMaxConnections(),
httpClientProperties.getMaxConnectionsPerRoute(),
httpClientProperties.getTimeToLive(),
httpClientProperties.getTimeToLiveUnit(),
this.registryBuilder);
this.connectionManagerTimer.schedule(new TimerTask() {
@Override
public void run() {
connectionManager.closeExpiredConnections();
}
}, 30000, httpClientProperties.getConnectionTimerRepeat());
return connectionManager;
}
@Bean(name = "closeableHttpClient", destroyMethod = "close")
public CloseableHttpClient httpClient(HttpClientConnectionManager httpClientConnectionManager) {
PoolingHttpClientConnectionManager connectionManager = (PoolingHttpClientConnectionManager)httpClientConnectionManager;
connectionManager.setMaxTotal(400);
connectionManager.setDefaultMaxPerRoute(100);
RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(2000)//从连接池获取连接等待超时时间
.setConnectTimeout(2000)//请求超时时间
.setSocketTimeout(15000)//等待服务响应超时时间
.build();
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create().setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig)
//自定义重试策略,针对502和503重试一次
// .setServiceUnavailableRetryStrategy(new CustomizedServiceUnavailableRetryStrategy())
.evictExpiredConnections();
CloseableHttpClient client = httpClientBuilder.build();
System.out.println("CloseableHttpClient = " + client);
log.info("CloseableHttpClient = {}", client);
return client;
}
/**
* 初始化异步客户端
* @return 异步客户端
*/
@Bean(name = "closeableHttpAsyncClient")
public CloseableHttpAsyncClient asyncHttpClient() {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(50000)
.setSocketTimeout(50000)
.setConnectionRequestTimeout(1000)
.build();
// 配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setSoKeepAlive(true)
.build();
// 设置连接池大小
ConnectingIOReactor ioReactor = null;
try {
ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
} catch (IOReactorException e) {
e.printStackTrace();
}
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
connManager.setMaxTotal(100);
connManager.setDefaultMaxPerRoute(100);
final CloseableHttpAsyncClient client = HttpAsyncClients.custom().
setConnectionManager(connManager)
.setDefaultRequestConfig(requestConfig)
.build();
log.info("CloseableHttpAsyncClient = {}", client);
client.start();
return client;
}
}
@Data
@Configuration
@ConfigurationProperties(prefix = "report-execute")
public class ReportExecuteConfigurationProperties {
/**
* Http://、Https://
*/
private String httpScheme = "Http://";
/**
* 负载均衡策略: 只支持随机和轮询,默认随机
* @see LbsStrategyEnum
*/
private String loadBalancingStrategy;
/**
* 服务列表
* 域名 或 ip + port
*/
private List<String> list;
}
@Slf4j
@Configuration
@AutoConfigureAfter(ReportExecuteConfigurationProperties.class)
public class ReportExecuteLbs implements ApplicationContextAware, InitializingBean {
public static ApplicationContext applicationContext;
public static LbsStrategyEnum strategy = RANDOM;
/**
* 多线程使用同一叠加计数器, volatile + CAS处理
*/
private static final AtomicInteger ATOMIC_ADDER = new AtomicInteger(0);
/**
* 组装好请求的uri 列表,允许修改,特别是后续使用 Zookeeper/Nacos等带钩子的配置中心
*/
private final static CopyOnWriteArrayList<String> uriList = new CopyOnWriteArrayList<>();
@Override
public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
ReportExecuteLbs.applicationContext = applicationContext;
}
@Override
public void afterPropertiesSet() {
ReportExecuteConfigurationProperties properties = applicationContext.getBean(ReportExecuteConfigurationProperties.class);
List<String> list = properties.getList();
if (list.isEmpty()) {
// log.error("获取到的 报表导出执行服务列表为空 !");
throw new RuntimeException("获取到的 报表导出执行服务列表为空 !");
}
if (ObjectUtils.isEmpty(properties.getLoadBalancingStrategy())) {
// log.info("配置的执行策略为空,使用默认的 随机策略 !");
strategy = RANDOM;
} else {
String loadBalancingStrategy = properties.getLoadBalancingStrategy();
strategy = LbsStrategyEnum.strategyValue(loadBalancingStrategy);
}
list.forEach(info -> uriList.addIfAbsent(properties.getHttpScheme() + info));
}
/**
* 根据配置的策略获取请求的url
* @return url地址
*/
public static String getHttp() {
return getHttp(strategy);
}
/**
* 根据配置的策略获取请求的url
* @param lbsStrategy 策略字符串
* @return url地址
*/
public static String getHttp(String lbsStrategy) {
LbsStrategyEnum lbsStrategyEnum = LbsStrategyEnum.strategyValue(lbsStrategy);
if (lbsStrategyEnum == null) {
return getHttp();
}
return getHttp(lbsStrategyEnum);
}
/**
* 根据配置的策略获取请求的url
* @return url地址
*/
public static String getHttp(LbsStrategyEnum lbsStrategyEnum) {
if (uriList.size() == 1) {
return uriList.get(0);
}
if (lbsStrategyEnum == RANDOM) {
int index = ThreadLocalRandom.current().nextInt(uriList.size());
return uriList.get(index - 1);
} else {
return uriList.get(ATOMIC_ADDER.getAndIncrement() % uriList.size());
}
}
}
public enum LbsStrategyEnum {
RANDOM("random"),
POLLING("polling")
;
public String strategy;
LbsStrategyEnum(String strategy) {
this.strategy = strategy;
}
/**
* 根据策略字符串获取枚举
* @param strategy 策略字符串
* @return 策略枚举
*/
public static LbsStrategyEnum strategyValue(String strategy) {
for (LbsStrategyEnum value : LbsStrategyEnum.values()) {
if (value.strategy.equals(strategy)) {
return value;
}
}
return null;
}
}
更多推荐
所有评论(0)