k8s 上运行我们的 springboot 服务之——flume同步数据到到clickHouse

clickHouse的简单介绍,详细介绍请查看官网或者百度

1)clickhouse非hadoop体系

2)使用sql语句,对于熟悉关系数据的人员入门相对简单

3)clickhouse最好用来读,不要用来变更,写用批量的方式

4)各种日志数据我们可以用flume同步到clickhouse来统一管理和做用户行为分析

5)mysql 增量同步到clickhouse,这里有一个思考:系统日志,交易日志,用户行为日志,已生成订单等不变的数据似乎可以同步到clickhouse来做报表、统计、数据分析等。

由于用户经常查询和操作一般都是最近的或者最新的数据,可以把这部分变更的有事务要求的数据放到mysql中。把mysql数据同步到clickhouse,近期最新和变更的数据在mysql中操作,其他大部分数据在clickhouse中操作,这样来减轻关系型数据库的性能瓶颈。

6)面对错综复杂的数据源我们似乎可以使用flink来把数据统一归集到clickhouse

以上都需要根据实际情况去测试使用。毕竟实践是检验真理的唯一标准

flume的简单介绍,详细介绍请查看官网或者百度

1)高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统

2)支持监听多种方式多种类型的文件或者文件目录数据的变更,以获得变更的数据并把这部分数据推送到不同的数据接收中间件

3)flume提供了多种插件来完成2中的需求,常用的例如:监听TCP的端口做为数据源,监听目录下日志文件的变更等

4)flume可以把变更的数据同步队列中,然后队列把数据分发到我们各种数据仓库中间件中,也可以不通过队列直接把数据同步存储到数据仓库中间件中

5)我们也可以自定义flume的ng来满足我们自己特殊的数据变更同步需求

本文主要讲解内容如下:

1、自定义实现flume ng

2、测试我们系统生成的日志通过1中实现的flume ng同步到 clickhouse

下面是核心代码:

1)在clickhouse中创建表:

<pre lang="text" data-origin="pm_code_preview" style="margin: 0px; padding: 0px; list-style: none; font-family: Courier, &quot;Courier New&quot;, monospace; display: block;">

CREATE TABLE default.sys_log (

 `id` String,

 `sys_name` String,

 `level` String,

 `msg` String,

 `thread` String,

 `create_date` DateTime,

 `exe_date` DateTime

) ENGINE = MergeTree()

PARTITION BY toYYYYMM(create_date)

ORDER BY exe_date;

2)自定义flume sink pom.xml

<pre lang="text" data-origin="pm_code_preview" style="margin: 0px; padding: 0px; list-style: none; font-family: Courier, &quot;Courier New&quot;, monospace; display: block;">

 <dependencies>

 <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->

 <dependency>

 <groupId>org.apache.flume</groupId>

 <artifactId>flume-ng-core</artifactId>

 </dependency>

 <dependency>

 <groupId>com.alibaba</groupId>

 <artifactId>fastjson</artifactId>

 </dependency>

 <dependency>

 <groupId>com.google.guava</groupId>

 <artifactId>guava</artifactId>

 </dependency>

 <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-configuration -->

 <dependency>

 <groupId>org.apache.flume</groupId>

 <artifactId>flume-ng-configuration</artifactId>

 </dependency>

 <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->

 <dependency>

 <groupId>org.apache.flume</groupId>

 <artifactId>flume-ng-sdk</artifactId>

 </dependency>

 <dependency>

 <groupId>com.opencsv</groupId>

 <artifactId>opencsv</artifactId>

 <version>4.2</version>

 </dependency>

 <dependency>

 <groupId>ru.yandex.clickhouse</groupId>

 <artifactId>clickhouse-jdbc</artifactId>

 <version>0.2.4</version>

 </dependency>

 <dependency>

 <groupId>org.projectlombok</groupId>

 <artifactId>lombok</artifactId>

 </dependency>

 </dependencies>
  1. flume 自定义sink 核心类
<pre lang="text" data-origin="pm_code_preview" style="margin: 0px; padding: 0px; list-style: none; font-family: Courier, &quot;Courier New&quot;, monospace; display: block;">

 private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSink.class);

 private BalancedClickhouseDataSource dataSource = null;

 private SinkCounter sinkCounter = null;

 private String host = null;

 private String port = null;

 private String user = null;

 private String password = null;

 private String database = null;

 private String table = null;

 private int batchSize;

 @Override

 public Status process() throws EventDeliveryException {

 Status status = null;

 Channel ch = getChannel();

 Transaction txn = ch.getTransaction();

 txn.begin();

 List<LogLogbackVo> insertData = new ArrayList<>();

 try {

 ClickHouseConnectionImpl conn = (ClickHouseConnectionImpl) dataSource.getConnection();

 int count;

 for (count = 0; count < batchSize; ++count) {

 Event event = ch.take();

 if (event == null) {

 break;

 }

 insertData.add(StringUtil.buildLog(new String(event.getBody())));

 }

 if (count <= 0) {

 sinkCounter.incrementBatchEmptyCount();

 txn.commit();

 return Status.BACKOFF;

 } else if (count < batchSize) {

 sinkCounter.incrementBatchUnderflowCount();

 } else {

 sinkCounter.incrementBatchCompleteCount();

 }

 sinkCounter.addToEventDrainAttemptCount(count);

 ClickHouseStatement sth = conn.createStatement();

 sth.write().table(String.format(" %s.%s", database, table)).data(new ByteArrayInputStream(JsonUtil.t2JsonString(insertData).getBytes()), ClickHouseFormat.JSONEachRow).addDbParam(ClickHouseQueryParam.MAX_PARALLEL_REPLICAS, MAX_PARALLEL_REPLICAS_VALUE).send();

 sinkCounter.incrementEventDrainSuccessCount();

 status = Status.READY;

 txn.commit();

 } catch (Throwable t) {

 txn.rollback();

 LOGGER.error(t.getMessage(), t);

 status = Status.BACKOFF;

 // re-throw all Errors

 if (t instanceof Error) {

 throw (Error) t;

 }

 } finally {

 txn.close();

 }

 return status;

 }

 @Override

 public void configure(Context context) {

 if (sinkCounter == null) {

 sinkCounter = new SinkCounter(getName());

 }

 Preconditions.checkArgument(context.getString(HOST) != null && context.getString(HOST).length() > 0, "ClickHouse host must be specified!");

 this.host = context.getString(HOST);

 if (!this.host.startsWith(CLICK_HOUSE_PREFIX)) {

 this.host = CLICK_HOUSE_PREFIX + this.host;

 }

 Preconditions.checkArgument(context.getString(DATABASE) != null && context.getString(DATABASE).length() > 0, "ClickHouse database must be specified!");

 this.database = context.getString(DATABASE);

 Preconditions.checkArgument(context.getString(TABLE) != null && context.getString(TABLE).length() > 0, "ClickHouse table must be specified!");

 this.table = context.getString(TABLE);

 this.port = context.getString(PORT, DEFAULT_PORT);

 this.user = context.getString(USER, DEFAULT_USER);

 this.password = context.getString(PASSWORD, DEFAULT_PASSWORD);

 this.batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);

 }

 @Override

 public void start() {

 LOGGER.info("clickHouse sink {} starting", getName());

 String jdbcUrl = String.format("%s:%s/%s", this.host, this.port, this.database);

 ClickHouseProperties properties = new ClickHouseProperties().withCredentials(this.user, this.password);

 this.dataSource = new BalancedClickhouseDataSource(jdbcUrl, properties);

 sinkCounter.start();

 super.start();

 LOGGER.info("clickHouse sink {} started", getName());

 }

 @Override

 public void stop() {

 LOGGER.info("clickHouse sink {} stopping", getName());

 sinkCounter.incrementConnectionClosedCount();

 sinkCounter.stop();

 super.stop();

 LOGGER.info("clickHouse sink {} stopped", getName());

 }

4)flume 对应配置:

<pre lang="text" data-origin="pm_code_preview" style="margin: 0px; padding: 0px; list-style: none; font-family: Courier, &quot;Courier New&quot;, monospace; display: block;">

# 指定Agent的组件名称 

a1.sources = r1 

a1.sinks = sink1 

a1.channels = c1 

a1.sources.r1.type=spooldir

a1.sources.r1.spoolDir=/home/spark/flume/data/log

a1.sources.r1.channels=c1

a1.sources.r1.fileHeader = false

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = timestamp

# 指定Flume sink 

a1.sinks.sink1.type = com.zhy.frame.newsql.clickhouse.sink.sink.ClickHouseSink

a1.sinks.sink1.host = localhost

a1.sinks.sink1.port = 8123

a1.sinks.sink1.database = default

a1.sinks.sink1.table = sys_log

a1.sinks.sink1.batchSize = 10000

a1.sinks.sink1.user = default

a1.sinks.sink1.password = 

# 指定Flume channel 

a1.channels.c1.type = memory 

a1.channels.c1.capacity = 1000 

a1.channels.c1.transactionCapacity = 100 

# 绑定source和sink到channel上 

a1.sources.r1.channels = c1 

a1.sinks.sink1.channel = c1

5)系统logback配置

<pre lang="text" data-origin="pm_code_preview" style="margin: 0px; padding: 0px; list-style: none; font-family: Courier, &quot;Courier New&quot;, monospace; display: block;">

<?xml version="1.0" encoding="UTF-8"?>

<!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->

<!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true -->

<!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 -->

<!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 -->

<configuration scan="true" scanPeriod="7 seconds">

 <contextName>logback</contextName>

 <!--系统名称-->

 <property name="sysName" value="frameSimple"/>

 <!--日志大小 KB MB-->

 <property name="logMaxFileSize" value="1KB"/>

 <!--日志保留天数-->

 <property name="logMaxHistory" value="7"/>

 <property name="logging.path" value="sysLog"/>

 <!-- 彩色日志 -->

 <!-- 彩色日志依赖的渲染类 -->

 <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/>

 <conversionRule conversionWord="wex"

 converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/>

 <conversionRule conversionWord="wEx"

 converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>

 <!-- 彩色日志格式 -->

 <property name="CONSOLE_LOG_PATTERN"

 value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>

 <!--输出到控制台-->

 <appender name="CONSOLE_APPENDER" class="ch.qos.logback.core.ConsoleAppender">

 <!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息-->

 <filter class="ch.qos.logback.classic.filter.ThresholdFilter">

 <level>info</level>

 </filter>

 <encoder>

 <Pattern>${CONSOLE_LOG_PATTERN}</Pattern>

 <!-- 设置字符集 -->

 <charset>UTF-8</charset>

 </encoder>

 </appender>

 <!--输出到文件-->

 <!-- 时间滚动输出 level为 DEBUG 日志 -->

 <appender name="DEBUG_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">

 <!-- 正在记录的日志文件的路径及文件名 -->

 <file>${logging.path}/debug_${sysName}.log</file>

 <!--日志文件输出格式-->

 <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">

 <jsonGeneratorDecorator

 class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/>

 <providers>

 <pattern>

 <pattern>

 {

 "sysName":"${sysName}",

 "thread":"%thread",

 "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",

 "level":"%level",

 "msg": "%msg"

 }

 </pattern>

 </pattern>

 </providers>

 </encoder>

 <!-- 日志记录器的滚动策略,按日期,按大小记录 -->

 <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">

 <!-- 日志归档 -->

 <fileNamePattern>${logging.path}/debug/debug_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern>

 <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">

 <maxFileSize>${logMaxFileSize}</maxFileSize>

 </timeBasedFileNamingAndTriggeringPolicy>

 <!--日志文件保留天数-->

 <maxHistory>${logMaxHistory}</maxHistory>

 </rollingPolicy>

 <!-- 此日志文件只记录debug级别的 -->

 <filter class="ch.qos.logback.classic.filter.LevelFilter">

 <level>DEBUG</level>

 <onMatch>ACCEPT</onMatch>

 <onMismatch>DENY</onMismatch>

 </filter>

 </appender>

 <!-- 时间滚动输出 level为 INFO 日志 -->

 <appender name="INFO_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">

 <!-- 正在记录的日志文件的路径及文件名 -->

 <file>${logging.path}/info_${sysName}.log</file>

 <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">

 <jsonGeneratorDecorator

 class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/>

 <providers>

 <pattern>

 <pattern>

 {

 "sysName":"${sysName}",

 "thread":"%thread",

 "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",

 "level":"%level",

 "msg": "%msg"

 }

 </pattern>

 </pattern>

 </providers>

 </encoder>

 <!-- 日志记录器的滚动策略,按日期,按大小记录 -->

 <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">

 <!-- 每天日志归档路径以及格式 -->

 <fileNamePattern>${logging.path}/info/info_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern>

 <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">

 <maxFileSize>${logMaxFileSize}</maxFileSize>

 </timeBasedFileNamingAndTriggeringPolicy>

 <!--日志文件保留天数-->

 <maxHistory>${logMaxHistory}</maxHistory>

 </rollingPolicy>

 <!-- 此日志文件只记录info级别的 -->

 <filter class="ch.qos.logback.classic.filter.LevelFilter">

 <level>INFO</level>

 <onMatch>ACCEPT</onMatch>

 <onMismatch>DENY</onMismatch>

 </filter>

 </appender>

 <!-- 时间滚动输出 level为 WARN 日志 -->

 <appender name="WARN_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">

 <!-- 正在记录的日志文件的路径及文件名 -->

 <file>${logging.path}/warn_${sysName}.log</file>

 <!--日志文件输出格式-->

 <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">

 <jsonGeneratorDecorator

 class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/>

 <providers>

 <pattern>

 <pattern>

 {

 "sysName":"${sysName}",

 "thread":"%thread",

 "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",

 "level":"%level",

 "msg": "%msg"

 }

 </pattern>

 </pattern>

 </providers>

 </encoder>

 <!-- 日志记录器的滚动策略,按日期,按大小记录 -->

 <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">

 <fileNamePattern>${logging.path}/warn/warn_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern>

 <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">

 <maxFileSize>${logMaxFileSize}</maxFileSize>

 </timeBasedFileNamingAndTriggeringPolicy>

 <!--日志文件保留天数-->

 <maxHistory>${logMaxHistory}</maxHistory>

 </rollingPolicy>

 <!-- 此日志文件只记录warn级别的 -->

 <filter class="ch.qos.logback.classic.filter.LevelFilter">

 <level>WARN</level>

 <onMatch>ACCEPT</onMatch>

 <onMismatch>DENY</onMismatch>

 </filter>

 </appender>

 <!-- 时间滚动输出 level为 ERROR 日志 -->

 <appender name="ERROR_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">

 <!-- 正在记录的日志文件的路径及文件名 -->

 <file>${logging.path}/error_${sysName}.log</file>

 <!--日志文件输出格式-->

 <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">

 <jsonGeneratorDecorator

 class="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/>

 <providers>

 <pattern>

 <pattern>

 {

 "sysName":"${sysName}",

 "thread":"%thread",

 "exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}",

 "level":"%level",

 "msg": "%msg"

 }

 </pattern>

 </pattern>

 </providers>

 </encoder>

 <!-- 日志记录器的滚动策略,按日期,按大小记录 -->

 <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">

 <fileNamePattern>${logging.path}/error/error_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern>

 <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">

 <maxFileSize>${logMaxFileSize}</maxFileSize>

 </timeBasedFileNamingAndTriggeringPolicy>

 <!--日志文件保留天数-->

 <maxHistory>${logMaxHistory}</maxHistory>

 </rollingPolicy>

 <!-- 此日志文件只记录ERROR级别的 -->

 <filter class="ch.qos.logback.classic.filter.LevelFilter">

 <level>ERROR</level>

 <onMatch>ACCEPT</onMatch>

 <onMismatch>DENY</onMismatch>

 </filter>

 </appender>

 <root level="INFO">

 <appender-ref ref="CONSOLE_APPENDER"/>

 <appender-ref ref="DEBUG_APPENDER"/>

 <appender-ref ref="INFO_APPENDER"/>

 <appender-ref ref="WARN_APPENDER"/>

 <appender-ref ref="ERROR_APPENDER"/>

 </root>

</configuration>
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐