设计模式-总览https://mp.csdn.net/mp_blog/creation/editor/122202507

目录

1、项目背景

2、实现大致流程和代码

1)、启动报表组件

2)、编写自己的ReportClient

3)、ReportClient执行动态代理业务

4)、报表执行服务

3、@EnableReportClient启动流程和@FeignClient代理执行流程分析


1、项目背景

报表导出服务经常会涉及jvm、io等异常情况,所以想要将报表的查询和导出在两个服务中进行,并且报表的查询是在k8s环境中的一个微服务,而报表的真正执行服务是在一个虚拟机(挂了不影响其他业务)。项目情况分析:

1)、需要将报表的查询和导出两个阶段在两个分别在两个环境中执行,所以不能使用注册中心查询当前列表。在spring boot yml配置中写死需要调用的服务列表。

2)、当当前只是开发报表的调用框架,具体开发人员时rpc参数可能是查询的参数,或者查询的结果,或者sql都是可能的参数不确定。所以需要开发类型Feign一样的框架,传递不确定个数和类型的参数。

3)、基于报表业务的深度开发,使用了动态代理在调用前直接将部分公用的报表业务耦合进流程中。并且微服务的报表查询服务,底层直接使用了 Feign装配的 Apache Httpclient线程池。

    设计的流程图如上,基于这样的设计与自己的部分知识结构有关:动态代码方面的基础知识、自己也梳理过Spring Aop的源码(Aop的本质就是动态代理)Feign的源码实现

2、实现大致流程和代码

1)、启动报表组件

    基于该框架,首先需要在Spring boot的启动类上添加启用报表组件的注解,并使用参数basePackages配置需要扫描@ReportClient注解的包路径,如下:

@EnableReportExecute(basePackages = "com.kevin")
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(ReportExecuteRegistrar.class)
public @interface EnableReportExecute {

    /**
     * 指定基本扫描包
     * @return 指定数组的 基准包
     */
    String[] basePackages() default {};
}

2)、编写自己的ReportClient

开发人员只在需要执行异步报表的controller处理完业务逻辑后,调用自己的报表接口服务,并且接口可以允许同步或者异步io调用,可以在接口类层或者接口层 设置负载均衡策略,设置rpc调用的超时时间。如下:

@ReportExecute(path = "/provider/v1/demoSku")
public interface DemoSkuService {
    @ReportMapping("executeData")
    void executeData(@Nullable String title);

    @ReportMapping("executeData2")
    void executeData2(@Nullable String title, String sql);
}

对应的@ExportClient、@ReportMapping类信息如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ReportExecute {
	/**
	 * 注入Spring Ioc容器的bean 名称
	 * @return bean名称
	 */
	String beanName() default "";

	/**
	 * 执行策略,为每个的服务定制负载均衡策略
	 * @see LbsStrategyEnum#strategy
	 */
	String lbsStrategy() default "polling";

	/**
	 * 指定的路径
	 * @return 统一前置路径,对所有方法生效
	 */
	String path() default "/";
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@SuppressWarnings("unused")
public @interface ReportMapping {

	/**
	 * 访问子路径
	 * @return bean名称
	 */
	String value() default "";

	/**
	 * 默认 POST请求
	 * @return 请求方式
	 */
	RequestMethod requestMethod() default RequestMethod.POST;

	/**
	 * 同步或异步, 默认异步
	 * @return bean名称
	 */
	SyncTag syncTag() default SyncTag.ASYNC;

	/**
	 * 超时时间
	 * @return 超时时间
	 */
	int timeOut() default 0;

	/**
	 * 超时单位
	 * @return 超时单位
	 */
	TimeUnit timeUnit() default TimeUnit.MILLISECONDS;

	/**
	 * 同步标识符
	 */
	enum SyncTag {
		SYNC, ASYNC
	}

	/**
	 * Http 访问方式
	 */
	enum RequestMethod {
		POST, GET
	}

}

3)、ReportClient执行动态代理业务

    @ReportClient接口执行的过程就是动态代理的过程,会调用报表服务添加一条数据,并且将报表的id使用http header的方式传递给报表执行服务。

public class ReportInvocationHandler implements InvocationHandler {

    @Override
    public Object invoke(Object o, Method method, Object[] objects) {
        ReportContext reportContext = BeanTools.getBean(ReportContext.class);
        ReportExecuteFactoryBean base = reportContext.getBase(method);
        String path = base.getPath();
        // 1、保存状态
        Object titleObj = objects[0];
        String title = titleObj == null ? "" : (String)titleObj;
        String id = saveRecord(title);

        // 2、调用报表执行服务
        Object[] param = Arrays.copyOfRange(objects, 1, objects.length);
        String httpPrefix = base.getLbsStrategy() == null ? ReportExecuteLbs.getHttp() : ReportExecuteLbs.getHttp(base.getLbsStrategy());
        ReportMapping postMapping = reportContext.getRequestMapping(method);
        String mapping = postMapping.value().startsWith("/") ? postMapping.value() : ("/" + postMapping.value());
        String pathPrefix = path.startsWith("/") || "".equals(path) ? path : ("/" + path);
//        return ReportHttpUtil.doHttp(id, title, postMapping, httpPrefix + pathPrefix + mapping, param);
        return SemaphoreLimiter.getInstance().execute(() -> 
    ReportHttpUtil.doHttp(id, title, postMapping, httpPrefix + pathPrefix + mapping, param));
    }

    /**
     * 保存记录信息
     * @return 主键Id
     */
    private String saveRecord(String title) {
        String id = UUID.randomUUID().toString();
        ReportRecord record = new ReportRecord();
        record.setId(id);
        record.setTitle(title);
        record.setState(ReportStateEnum.INIT.state);
        record.setUrl("");
        record.setObjectVersionNumber(0L);

        // 从 单点登录中获取用户信息
        String createBy = "kevin";
        record.setCreateBy(createBy);
        record.setLastUpdateBy(createBy);
        BeanTools.getBean(ReportRecordService.class).insert(record);

        return id;
    }
}

    并且这里为了服务的健壮性,使用了jdk current包Semaphore是实现的本地限流器,SemaphoreLimiter代码如下:

@Slf4j
public class SemaphoreLimiter<V> {

    /**
     * 限流器
     */
    private static final Semaphore SEMAPHORE = new Semaphore(15);

    private static final SemaphoreLimiter INSTANCE = new SemaphoreLimiter();

    /**
     * 最大的执行等待时间,单位毫秒
     */
    private static final int MAX_WAIT_TIME = 5000;

    /**
     * 单例构造私有化
     */
    private SemaphoreLimiter() {
    }

    /**
     * 共有的实例初始化
     * @return 单例对象
     */
    public static SemaphoreLimiter getInstance() {
        return INSTANCE;
    }

    /**
     * 在限流中执行服务
     * @param callable 单纯的任务回调信息
     * @return 远程方法调用结果
     */
    public V execute(Callable<V> callable) {
        try {
            SEMAPHORE.tryAcquire(MAX_WAIT_TIME, TimeUnit.MILLISECONDS);
            return callable.call();
        } catch (InterruptedException e) {
            log.info("使用限流器,发生中断异常!");
            buildException(LIMITER_EXCEPTION);
        } catch (Exception e) {
            log.error("异常信息 = " + e);
            buildException(LIMITER_EXCEPTION);
        } finally {
            SEMAPHORE.release();
        }
        return null;
    }

}

4)、报表执行服务

   首先报表执行服务会使用拦截器,将http header中的 reportId或者并且存放到 ThreadLocal中,如下:

public class ReportInterceptor implements HandlerInterceptor {

    /**
     * 存放当前请求的回调主键和url信息
     */
    public static final ThreadLocal<ReportBO> THREAD_LOCAL = new ThreadLocal<>();

    /**
     * 保存导出的报表地址
     * @param url 报表访问地址
     */
    public static void saveUrl(String url) {
        ReportBO reportBO = THREAD_LOCAL.get();
        if (reportBO != null) {
            reportBO.setUrl(url);
        }
    }

    /**
     * 获取当前的报表标题
     * @return 报表标题
     */
    public static String getTitle() {
        ReportBO reportBO = THREAD_LOCAL.get();
        if (reportBO == null) {
            return "";
        }
        return reportBO.getTitle();
    }

    @Override
    public boolean preHandle(HttpServletRequest request, @NonNull HttpServletResponse response, @NonNull Object handler) throws UnsupportedEncodingException {
        String reportBatchId = request.getHeader("reportBatchId");
        if (reportBatchId != null) {
            String reportTitle = request.getHeader("reportTitle");
            reportTitle = ObjectUtils.isEmpty(reportTitle) ? "" : URLDecoder.decode(reportTitle, "UTF-8");
            THREAD_LOCAL.set(new ReportBO(reportBatchId, reportTitle));
        }
        return true;
    }

    @Override
    public void postHandle(@NonNull HttpServletRequest request, @NonNull HttpServletResponse response, @NonNull Object handler, ModelAndView modelAndView) {

    }

    @Override
    public void afterCompletion(@NonNull HttpServletRequest request, @NonNull HttpServletResponse response, @NonNull Object handler, Exception ex) {
        try {
            ReportBO reportBO = THREAD_LOCAL.get();
            if (reportBO != null && !ObjectUtils.isEmpty(reportBO.getUrl())) {
                // 自动回调报表服务,修改状态,添加访问地址
                ReportHttpUtil.doHttp();
            }
        } finally {
            THREAD_LOCAL.remove();
        }
    }
}

在调用到报表导出服务的controller,并且处理完业务逻辑后,直接调用工具类将文件存储到固定的位置(可以是共享磁盘、fastdfs等),此时工具类本身还会将存储完成的文件存放url放置到ThreadLocal中,为了方便就直接使用了上面的ReportInterceptor的ThreadLocal,并且提供了saveUrl方法。流程核心代码就是下面的:

ReportInterceptor.saveUrl(url);
@Slf4j
@Component
public class ReportConsistentUtil implements InitializingBean {

    public static final ThreadLocal<SimpleDateFormat> DATETIME_FORMAT = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss"));

    /** Windows系统的基本面路径 */
    @Value("${report.base.dir.windows}")
    private static String windowsBaseDirectory = "D:\\report\\execute\\";
    /** Linux操作系统的基本路径 */
    @Value("${report.base.dir.linux}")
    private static String linuxBaseDirectory = "/usr/local/execute";
    /** 当前操作系统对应的基本路径 */
    private static String BASE_DIRECTORY;
    /** 当前的操作系统 */
    private static final String os = System.getProperty("os.name").toLowerCase();

    public static void main(String[] args) throws IOException {
        /*File kevin = new File("Z:/report/kevin.txt");
        log.info("save file: file path = {}, file name = {}", kevin.getPath(), kevin.getName());
        ReportConsistentUtil.saveByFile(Model.Order, kevin);*/

        String path = ReportConsistentUtil.saveByFullPath(Model.ORDER, "Z:/report/kevin.txt");

        log.info("得到的最终文件路径:{}", path);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        initBaseDir();
    }

    /**
     * 根据路径保持
     * @param model 所属模块
     * @param fullPath 文件全路径
     */
    public static String saveByFullPath(Model model, String fullPath) throws IOException {
        if (ObjectUtils.isEmpty(fullPath)) {
            throw new RuntimeException("执行的全路径为空!");
        }
        return saveByFile(model, new File(fullPath));
    }

    /**
     * 保存文件
     * @param model 所属模块
     * @param file 文件
     * @throws IOException io异常
     */
    public static String saveByFile(Model model, File file) throws IOException {
        FileInputStream fis = new FileInputStream(file);
        String suffix = file.getName().substring(file.getName().lastIndexOf("."));
        return saveByOutputStream(model, fis, suffix);
    }

    /**
     * 将文件输入流写到指定位置
     * @param model 所属模块
     * @param fis 文件输入流
     * @throws IOException io异常
     */
    public static String saveByOutputStream(Model model, FileInputStream fis, String suffix) throws IOException {
        /*第四步:将得到的文件名称的扩展名改为.jad*/
        String sub = DATETIME_FORMAT.get().format(new Date());
        String title = ReportInterceptor.getTitle();
        String fileName = ObjectUtils.isEmpty(title) ? sub : title + "-" + sub;
        String url = File.separator + model.directory + File.separator + fileName + suffix;
        File file = new File(BASE_DIRECTORY + url);
        FileOutputStream fos = new FileOutputStream(file);
        copy(fis, fos);
        fis.close();
        fos.close();
        // 回执请求的url
        ReportInterceptor.saveUrl(url);
        return file.getPath();
    }

    /**
     * 用户获取报表
     * @param url 报表记录中的url【Model + UUID】
     * @return 文件输出流
     */
    public static File getExport(String url) {
        return new File(BASE_DIRECTORY + File.separator + url);
        /*try {
            return file;
        } catch (FileNotFoundException e) {
            return null;
        }*/
    }

    static {
//        initBaseDir();
    }

    /**
     * 初始化跟目录,如果不存在
     */
    private void initBaseDir() {
        File file;

        if (OperateSystem.LINUX.system.toLowerCase().equals(os)) {
            file = new File(linuxBaseDirectory);
            BASE_DIRECTORY = linuxBaseDirectory;
        } else if (os.contains(WINDOWS.system.toLowerCase())) {
            file = new File(windowsBaseDirectory);
            BASE_DIRECTORY = windowsBaseDirectory;
        } else {
            throw new RuntimeException("不支持的操作系统");
        }
        if (!file.exists()) {
            boolean mkdirs = file.mkdirs();
            if (mkdirs) {
                log.info("创建目录:{} 成功!", file.getPath());
            }
        }

        for (Model model : Model.values()) {
            File childFile = new File(file.getPath() + File.separator + model.directory);
            if (!childFile.exists()) {
                boolean mkdirs = childFile.mkdirs();
                if (mkdirs) {
                    log.info("创建目录:{} 成功!", childFile.getPath());
                }
            }
        }
    }

    /**
     * 流拷贝
     * @param in 输入流
     * @param out 输出流
     * @throws IOException io异常
     */
    private static void copy(InputStream in, OutputStream out) throws IOException {
        byte[] buf = new byte[1024];
        int len;
        /*读取文件内容并写入文件字节流中*/
        while((len = in.read(buf))!=-1) {
            out.write(buf, 0, len);
        }
    }

    /**
     *  报表类型
     * @author kevin
     * @date 2020/10/28 16:10
     * @since 1.0.0
     */
    public enum Model {

        ORDER("order", "订单报表"),
        ;
        /**
         * 目录
         */
        public String directory;

        /**
         * 描述
         */
        public String description;

        Model(String directory, String description) {
            this.directory = directory;
            this.description = description;
        }
    }

    /**
     * 操作系统
     * @author kevin
     * @date 2020/10/28 16:14
     * @since 1.0.0
     */
    public enum OperateSystem {
        /** Linux系统  */
        WINDOWS("Windows"),
        /** Windows系统  */
        LINUX("Linux"),
        ;

        public String system;

        OperateSystem(String system) {
            this.system = system;
        }
    }
}

最后,ReportInterceptor拦截器的afterCompletion方法,会判断ThreadLocal中是否存在保存导出文件后的url地址,如果存在则根据 reportId和url 回调用报表服务的修改url接口,以添加下载地址并且将报表状态修改为可下载。

@Override
public void afterCompletion(@NonNull HttpServletRequest request, @NonNull HttpServletResponse response, @NonNull Object handler, Exception ex) {
    try {
        ReportBO reportBO = THREAD_LOCAL.get();
        if (reportBO != null && !ObjectUtils.isEmpty(reportBO.getUrl())) {
            // 自动回调报表服务,修改状态,添加访问地址
            ReportHttpUtil.doHttp();
        }
    } finally {
        THREAD_LOCAL.remove();
    }
}
@Slf4j
@Component
public class ReportHttpUtil implements InitializingBean {

    private static final String CHARSET = "UTF-8";

    private static final String CONTENT_TYPE = "application/json;charset=utf8";

    /**
     * {@link CloseableHttpClient}
     */
    public ReportHttpUtil() { }

    @Override
    public void afterPropertiesSet() {
//        log.info("feign httpclient Class = {}", httpClient.getClass());
    }

    /**
     * 发送同的Post请求
     */
    public static void doHttp() {
        ReportBO reportBO = ReportInterceptor.THREAD_LOCAL.get();
        // 创建Post请求
        HttpPost httpPost = new HttpPost(ReportExecuteLbs.getFullHttp());
        ReportRecord reportRecord = new ReportRecord();
        reportRecord.setId(reportBO.getBatchId());
        reportRecord.setUrl(reportBO.getUrl());
        // 我这里利用阿里的fastjson,将Object转换为json字符串;
        // (需要导入com.alibaba.fastjson.JSON包)
        String jsonString = JSON.toJSONString(reportRecord);

        StringEntity entity = new StringEntity(jsonString, CHARSET);

        // post请求是将参数放在请求体里面传过去的;这里将entity放入post请求体中
        httpPost.setEntity(entity);
        httpPost.setHeader("Content-Type", CONTENT_TYPE);
        // 响应模型
        CloseableHttpResponse response = null;
        try {
            // 由客户端执行(发送)Post请求
            CloseableHttpClient httpClient = HttpClients.createDefault();
            response = httpClient.execute(httpPost);
        } catch (ParseException | ClientProtocolException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                // 释放资源
                if (response != null) {
                    response.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

3、@EnableReportClient启动流程和@FeignClient代理执行流程分析

@EnableReportExecute类中使用import的方式注入@Import(ReportExecuteRegistrar.class),就在是ioc容器启动时将包的根目录下面的@ReportClient标注的接口类以ReportExecuteFactoryBean的形式注入成BeanDefinition,那么当IOC获取该类型FactoryBean时,即调用其getObject方法,并且此时返回的已经是jdk代理对象。此时会将反射的method和注解信息存放到ReportContext中(而其本身就是一个ConcurrentHashMap)。
public class ReportExecuteRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {

    private ResourceLoader resourceLoader;
    private Environment environment;

    @Override
    public void setResourceLoader(@NonNull ResourceLoader resourceLoader) {
        this.resourceLoader = resourceLoader;
    }

    @Override
    public void setEnvironment(@NonNull Environment environment) {
        this.environment = environment;
    }

    @Override
    public void registerBeanDefinitions(AnnotationMetadata metadata, @NonNull BeanDefinitionRegistry registry) {
        Map<String, Object> defaultAttrs = metadata.getAnnotationAttributes(EnableReportExecute.class.getName(), true);
        if (defaultAttrs == null) {
            return;
        }
        // 注册指定扫描的包
        if (defaultAttrs.containsKey("basePackages")) {
            String[] basePackages = (String[]) defaultAttrs.get("basePackages");

            AnnotationTypeFilter includeFilter = new AnnotationTypeFilter(ReportExecute.class);
            ClassPathScanningCandidateComponentProvider scanner = getScanner();
            scanner.setResourceLoader(this.resourceLoader);
            scanner.addIncludeFilter(includeFilter);

            for (String basePackage : basePackages) {
                for (BeanDefinition candidateComponent : scanner.findCandidateComponents(basePackage)) {
                    if (candidateComponent instanceof AnnotatedBeanDefinition) {
                        AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
                        AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
                        Assert.isTrue(annotationMetadata.isInterface(), "@ReportExecute can only be specified on an interface");

                        Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(ReportExecute.class.getCanonicalName());

                        assert attributes != null;
                        String lbsStrategy = getWithDefaultValue(attributes, "lbsStrategy");
                        String beanName = getWithDefaultValue(attributes, "beanName");
                        String path = getWithDefaultValue(attributes, "path");
                        if (ObjectUtils.isEmpty(beanName)) {
                            beanName = annotationMetadata.getClassName();
                        }
                        registerReportExecute(registry, annotationMetadata, beanName, lbsStrategy, path);
                    }
                }
            }
        }
    }

    /**
     * 获取默认值
     * @param attributes 配置的属性
     * @param key key
     * @return 对应的配置值
     */
    private String getWithDefaultValue(Map<String, Object> attributes, String key) {
        return attributes.get(key) == null ? "" : (String)attributes.get(key);
    }

    /**
     * 注册Bean 待{@link AbstractApplicationContext#refresh()} finishBeanFactoryInitialization完成该 FactoryBean的getObject调用
     * @param registry 注册器,当前即为AbstractApplicationContext
     * @param annotationMetadata 注解元信息
     * @param beanName bean 名称
     * @param lbsStrategy 指定的负载均衡策略
     */
    private void registerReportExecute(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata,
                                       String beanName, String lbsStrategy, String path) {

        String className = annotationMetadata.getClassName();
        String finalBeanName = ObjectUtils.isEmpty(beanName) ? className : beanName;

        BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(ReportExecuteFactoryBean.class);
        definition.addPropertyValue("type", className);
        definition.addPropertyValue("beanName", finalBeanName);
        definition.addPropertyValue("lbsStrategy", lbsStrategy);
        definition.addPropertyValue("path", path);
        definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
        registry.registerBeanDefinition(finalBeanName, definition.getBeanDefinition());
    }

    /**
     * 初始化类路径的扫描器
     * 最后会获取到磁盘包路径下的所有class文件,使用类加载器进行加载
     * @return 路径扫描器
     */
    protected ClassPathScanningCandidateComponentProvider getScanner() {
        return new ClassPathScanningCandidateComponentProvider(false, this.environment) {
            @Override
            protected boolean isCandidateComponent(@NonNull AnnotatedBeanDefinition beanDefinition) {
                boolean isCandidate = false;
                if (beanDefinition.getMetadata().isIndependent()) {
                    if (!beanDefinition.getMetadata().isAnnotation()) {
                        isCandidate = true;
                    }
                }
                return isCandidate;
            }
        };
    }

}
@Data
public final class ReportExecuteFactoryBean implements FactoryBean<Object>, BeanClassLoaderAware {

    private ClassLoader classLoader;

    /**
     * Bean名称
     */
    private String beanName;

    /**
     * 对应{@link ReportExecute} 的类
     */
    private Class<?> type;

    /**
     * 负载均衡策略
     */
    private String lbsStrategy;

    /**
     * 类上配置的前缀访问路径
     */
    private String path;

    @Override
    public Object getObject() {
        ReportContext reportContext = BeanTools.getBean(ReportContext.class);
        Method[] methods = type.getMethods();
        for (Method method : methods) {
            ReportMapping annotation = method.getAnnotation(ReportMapping.class);
            if (annotation == null) {
                throw new RuntimeException("配置异常,当前只支持ReportMapping.class, Class = " + type);
            }
            reportContext.setRequestMapping(method, annotation);
            reportContext.setBase(method, this);
        }
        // 返回代理放入IOC
        return Proxy.newProxyInstance(classLoader, new Class[]{type}, new ReportInvocationHandler());
    }

    @Override
    public void setBeanClassLoader(@NonNull ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }

    @Override
    public Class<?> getObjectType() {
        return this.type;
    }
}
@Component
public class ReportContext extends ConcurrentHashMap<Method, ReportMapping> {

    /**
     * 父类存放方法配置,该容器配置类配置
     */
    private final Map<Method, ReportExecuteFactoryBean> CONFIG_BASE = new ConcurrentHashMap<>();

    /**
     * 获取{@link ReportMapping} 配置信息
     * @param method 方法信息
     * @return 配置信息
     */
    public ReportMapping getRequestMapping(Method method) {
        return super.get(method);
    }

    public void setRequestMapping(Method method, ReportMapping postMapping) {
        super.put(method, postMapping);
    }

    /**
     * 设置配置关系
     * @param method 方法
     * @param factoryBean 方法所在类的生产工厂
     */
    public void setBase(Method method, ReportExecuteFactoryBean factoryBean) {
        CONFIG_BASE.put(method, factoryBean);
    }

    /**
     * 获取当前访问的前缀信息
     * @param method 方法
     * @return 方法所在类的生产工厂
     */
    public ReportExecuteFactoryBean getBase(Method method) {
        return CONFIG_BASE.get(method);
    }

}

其他说明:使用的是Feign的线程池,这里使用了httpClient类型的FeignClient。已经一些负载均衡的类等,如下:

@Slf4j
@Component
@ConditionalOnProperty(value = "feign.httpclient.enabled", matchIfMissing = true)
@AutoConfigureBefore(FeignRibbonClientAutoConfiguration.class) //  在 {@code HttpClientFeignLoadBalancedConfiguration} 之前注入
public class FeignHttpClientConfig {

    @Autowired(required = false)
    private RegistryBuilder registryBuilder;

    private final Timer connectionManagerTimer = new Timer("FeignApacheHttpClientConfiguration.connectionManagerTimer", true);

    @Bean
    @ConditionalOnMissingBean(HttpClientConnectionManager.class)
    public HttpClientConnectionManager connectionManager(
            ApacheHttpClientConnectionManagerFactory connectionManagerFactory,
            FeignHttpClientProperties httpClientProperties) {
        final HttpClientConnectionManager connectionManager = connectionManagerFactory
                .newConnectionManager(httpClientProperties.isDisableSslValidation(),
                        httpClientProperties.getMaxConnections(),
                        httpClientProperties.getMaxConnectionsPerRoute(),
                        httpClientProperties.getTimeToLive(),
                        httpClientProperties.getTimeToLiveUnit(),
                        this.registryBuilder);
        this.connectionManagerTimer.schedule(new TimerTask() {
            @Override
            public void run() {
                connectionManager.closeExpiredConnections();
            }
        }, 30000, httpClientProperties.getConnectionTimerRepeat());
        return connectionManager;
    }

    @Bean(name = "closeableHttpClient", destroyMethod = "close")
    public CloseableHttpClient httpClient(HttpClientConnectionManager httpClientConnectionManager) {
        PoolingHttpClientConnectionManager connectionManager = (PoolingHttpClientConnectionManager)httpClientConnectionManager;
        connectionManager.setMaxTotal(400);
        connectionManager.setDefaultMaxPerRoute(100);

        RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(2000)//从连接池获取连接等待超时时间
                .setConnectTimeout(2000)//请求超时时间
                .setSocketTimeout(15000)//等待服务响应超时时间
                .build();
        HttpClientBuilder httpClientBuilder = HttpClientBuilder.create().setConnectionManager(connectionManager)
                .setDefaultRequestConfig(requestConfig)
                //自定义重试策略,针对502和503重试一次
//                .setServiceUnavailableRetryStrategy(new CustomizedServiceUnavailableRetryStrategy())
                .evictExpiredConnections();
        CloseableHttpClient client = httpClientBuilder.build();
        System.out.println("CloseableHttpClient = " + client);
        log.info("CloseableHttpClient = {}", client);
        return client;
    }

    /**
     * 初始化异步客户端
     * @return 异步客户端
     */
    @Bean(name = "closeableHttpAsyncClient")
    public CloseableHttpAsyncClient asyncHttpClient() {
        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(50000)
                .setSocketTimeout(50000)
                .setConnectionRequestTimeout(1000)
                .build();

        // 配置io线程
        IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
                setIoThreadCount(Runtime.getRuntime().availableProcessors())
                .setSoKeepAlive(true)
                .build();
        // 设置连接池大小
        ConnectingIOReactor ioReactor = null;
        try {
            ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
        } catch (IOReactorException e) {
            e.printStackTrace();
        }
        PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
        connManager.setMaxTotal(100);
        connManager.setDefaultMaxPerRoute(100);


        final CloseableHttpAsyncClient client = HttpAsyncClients.custom().
                setConnectionManager(connManager)
                .setDefaultRequestConfig(requestConfig)
                .build();
        log.info("CloseableHttpAsyncClient = {}", client);
        client.start();
        return client;
    }

}
@Data
@Configuration
@ConfigurationProperties(prefix = "report-execute")
public class ReportExecuteConfigurationProperties {

    /**
     * Http://、Https://
     */
    private String httpScheme = "Http://";

    /**
     *  负载均衡策略: 只支持随机和轮询,默认随机
     * @see LbsStrategyEnum
     */
    private String loadBalancingStrategy;

    /**
     * 服务列表
     * 域名 或 ip + port
     */
    private List<String> list;

}
@Slf4j
@Configuration
@AutoConfigureAfter(ReportExecuteConfigurationProperties.class)
public class ReportExecuteLbs implements ApplicationContextAware, InitializingBean {

    public static ApplicationContext applicationContext;

    public static LbsStrategyEnum strategy = RANDOM;

    /**
     * 多线程使用同一叠加计数器, volatile + CAS处理
     */
    private static final AtomicInteger ATOMIC_ADDER = new AtomicInteger(0);

    /**
     * 组装好请求的uri 列表,允许修改,特别是后续使用 Zookeeper/Nacos等带钩子的配置中心
     */
    private final static CopyOnWriteArrayList<String> uriList = new CopyOnWriteArrayList<>();

    @Override
    public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
        ReportExecuteLbs.applicationContext = applicationContext;
    }

    @Override
    public void afterPropertiesSet() {
        ReportExecuteConfigurationProperties properties = applicationContext.getBean(ReportExecuteConfigurationProperties.class);
        List<String> list = properties.getList();
        if (list.isEmpty()) {
//            log.error("获取到的 报表导出执行服务列表为空 !");
            throw new RuntimeException("获取到的 报表导出执行服务列表为空 !");
        }
        if (ObjectUtils.isEmpty(properties.getLoadBalancingStrategy())) {
//            log.info("配置的执行策略为空,使用默认的 随机策略 !");
            strategy = RANDOM;
        } else {
            String loadBalancingStrategy = properties.getLoadBalancingStrategy();
            strategy = LbsStrategyEnum.strategyValue(loadBalancingStrategy);
        }
        list.forEach(info -> uriList.addIfAbsent(properties.getHttpScheme() + info));
    }

    /**
     * 根据配置的策略获取请求的url
     * @return url地址
     */
    public static String getHttp() {
        return getHttp(strategy);
    }

    /**
     *  根据配置的策略获取请求的url
     * @param lbsStrategy 策略字符串
     * @return url地址
     */
    public static String getHttp(String lbsStrategy) {
        LbsStrategyEnum lbsStrategyEnum = LbsStrategyEnum.strategyValue(lbsStrategy);
        if (lbsStrategyEnum == null) {
            return getHttp();
        }
        return getHttp(lbsStrategyEnum);
    }

    /**
     * 根据配置的策略获取请求的url
     * @return url地址
     */
    public static String getHttp(LbsStrategyEnum lbsStrategyEnum) {
        if (uriList.size() == 1) {
            return uriList.get(0);
        }
        if (lbsStrategyEnum == RANDOM) {
            int index = ThreadLocalRandom.current().nextInt(uriList.size());
            return uriList.get(index - 1);
        } else {
            return uriList.get(ATOMIC_ADDER.getAndIncrement() % uriList.size());
        }
    }

}
public enum LbsStrategyEnum {

    RANDOM("random"),

    POLLING("polling")
    ;

    public String strategy;

    LbsStrategyEnum(String strategy) {
        this.strategy = strategy;
    }

    /**
     * 根据策略字符串获取枚举
     * @param strategy 策略字符串
     * @return 策略枚举
     */
    public static LbsStrategyEnum strategyValue(String strategy) {
        for (LbsStrategyEnum value : LbsStrategyEnum.values()) {
            if (value.strategy.equals(strategy)) {
                return value;
            }
        }
        return null;
    }
}

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐