logo
publist
写文章

简介

该用户还未填写简介

擅长的技术栈

可提供的服务

暂无可提供的服务

flink 1.12 SQL Demo

Flink 版本 1.12.3source是kafka 维表是MySQL source left join 维表public class FlinkTableDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironmen

#flink#sql#java
Flink kafkaSink 二

这个是接着第一篇写的:主要是实现 直接上代码1.实现KeyedSerializationSchema接口public class DefinedSerializationTest implements KeyedSerializationSchema<Tuple2<String, String>> {private static final

kafka 笔记三 kafka消费者 心跳

消费者心跳机制Kafka 的心跳是 Kafka Consumer 和 Broker 之间的健康检查,只有当 Broker Coordinator 正常时,Consumer 才会发送心跳。Consumer 和 Rebalance 相关的 2 个配置参数:broker 端,sessionTimeoutMs 参数broker 处理心跳的逻辑在 GroupCoordinator 类中:如果心跳超期, br

Kafka 二 kafka的瓶颈

这一章主要是说一下,kafka的瓶颈。1.磁盘的吞吐量生产者客户端的性能直接受到服务器端磁盘吞吐量的影响。生产者生成的消息必须被提交到服务器保存,大多数客户端在发送消息之后会一直等待,直到至少有一个服务器确认悄息已经成功提交为止。也就是说,磁盘写入速度越快,生成消息的延迟就越低。2.磁盘的容量磁盘容量是另一个值得讨论的话题。需要多大的磁盘容量取决于需要保留的消息数量。如果服务器每天...

Flink kafkaSink

因为是用sql写的,写KafkaSink的时候,是准备用KafkaTableSink有两个问题,第一个KafkaTableSink只能接受appendstream但是table流是包含了删除的(流回撤),所以使用不了。使用的还是必须经过Filter将Table流中的false流剔除。第二个问题,进入KafkaTableSink的源码:public KafkaTableSink(...

kafka 笔记十 kafka 延时队列

延时队列两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给follower副本,不过在leader副本一直没有新消息写入的情况下,follower副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源。Kafka在处理拉取请求时,会先读取一次

Flink Kafka TumblingEventTimeWindows

好久没写博客了,更新一下。这篇博客主要将的是Flink中双流join的操作。在说双流join之前先了解一下window的定义,推荐博客。然后了解一下watermark的定义,推荐博客。我是在看完第二篇博客后,将数据源改为了kafka。但是一直没有触发window,打印window信息。然后在Flink的开发者邮箱中看见了问题解答。从邮箱的回答中可以很明显的看出为什么没有触发window。...

Flink restful API demo

主要解决用flink的restful API 来启动和停止yarn上的flink任务github地址:https://github.com/wenbaoup/flink-restful-demopackage com.wenbao.flink.restful.flink;import com.alibaba.fastjson.JSONObject;import org.apache.h...

spark kafka Topic not present in metadata after 60000 ms

structstured streaming 结合kafka问题出在kafka加了kerberos认证在将数据写入kafka时需要加.option("kafka.security.protocol", "SASL_PLAINTEXT")

到底了