springboot整合flink(一)
flink大数据流式处理框架,提高开发效率
springboot整合flink(一)
欢迎加入扣扣组织,783092701
一、场景
需要使用mongodb,由于数据量很小且单机版,考虑使用flink CDC不太合适;
然后看到使用springboot框架集成mongodb操作起来很方便,所以就有了此篇文章。
二、遇到的问题
现象1:springboot依赖注入与flink冲突
现象2:flink该放在springboot哪一层?
现象3:springboot结合flink如何处理启动类?
现象4:flink作用域,与springboot作用域
现象5:如何在springboot启动类里使用bean
三、springboot结合flink如何处理启动类?
1、springboot启动过程
2、springboot启动立即执行flink
1) 有10种方法立即执行,我们从中筛选
方法1:静态方法/@PostConstruct(场景不符)
方法2:实现InitializingBean接口(执行在容器前)
方法3:提供initMethod(执行在方法2前)
方法4:实现ApplicationRunner接口(执行在容器前)
方法5:实现CommandLineRunner接口(CommandLineRunner和ApplicationRunner的作用是相同,参数不同)
方法6:在启动类的main里边进行
方法7:@EnableScheduling(场景不符)
方法8:实现ApplicationListener接口(场景不符)
方法9:实现ServletContextAware接口(web项目用)
方法10:实现ServletContextListener接口(web项目用)
2) 我们测试ApplicationRunner执行顺序
是一个接口,常用于项目启动后,(也就是ApringApplication.run()执行结束),立马执行某些逻辑。
可用于项目的准备工作,比如加载配置文件,加载执行流,定时任务等等。
测试与run()的执行先后顺序:
从图中看出,ApplicationRunner在容器实例化之前执行,flink消费kafka写在此处不合适,因为我们要获取yml里属性配置
所以,我们就把flink执行流就写在main函数的run()后:
因此上网搜了一下,找到了解决的办法,就是把get,set方法的static 修饰符去掉,然后在set方法上面加@Resource即可,但是不适合解决我们的问题,我们用下面方法解决
3) 通过上下文接口获取属性文件配置
import javax.annotation.Resource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import com.yhzq.config.SystemConfig;
import com.yhzq.util.BeanUtil;
/**
* <p>Title: Application.java</p>
* <p>Description: </p>
* @author wangmoumo
* @version 1.0
* @date 2022年10月18日
* @url www.681vip.com
*/
@SpringBootApplication
public class Application{
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
/* run方法的返回值ConfigurableApplicationContext继承了ApplicationContext上下文接口 */
ConfigurableApplicationContext applicationContext = SpringApplication.run( Application.class, args );
/* 将run方法的返回值赋值给工具类中的静态变量 */
BeanUtil.applicationContext = applicationContext;
getSystemConfig();
}
/* 调用 注意Application是我们SpringBoot的启动类 */
public static void getSystemConfig(){
/* 测试获取已经实例化的接口bean,执行bean中方法 */
SystemConfig systemConfig = BeanUtil.getBean(SystemConfig.class );
System.out.println(systemConfig);
}
}
3、sysconfig设置为flink全局参数
env.getConfig().setGlobalJobParameters(systemConfig);
更多内容:vip资料分享网
更多推荐
所有评论(0)