这里用的是用官网提供的maven命令构建的flink1.4.0的flink-quick-start工程,具体构建工程命令如下

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.4.0

然后会获取到一个java的初始工程。

下面是一个flink连接kafka的demo

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.util.Properties;


public class connectkafka {
    private static String ZOOKEEPER_HOST="your-zookeeper-cluster";
    private static String KAFKA_BROKER="your-kafka-cluster";
    private static String GROUP="your-comsumer-group-name";

    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        connectkafka conn=new connectkafka();
        //test message:String. Real message :Tuple3<uid,itemid,rating>
        Properties kafkaprops=new Properties();
        kafkaprops.setProperty("zookeeper.connect",ZOOKEEPER_HOST);
        kafkaprops.setProperty("bootstrap.servers",KAFKA_BROKER);
        kafkaprops.setProperty("group.id",GROUP);

        DataStream<String> messagestream=env.addSource(new FlinkKafkaConsumer010<String>(
                "test",//这里是你的topic name
                new SimpleStringSchema(),
                kafkaprops));

        //show the message
        messagestream.print();
        env.execute();

这样一来简单的demo就完成了,但是跑起来的时候会报错,具体如下:

这里写图片描述

这个错误怎么解决呢?具体做法就是修改你的pom.xml文件,将这一段的provide给注释掉。这样就不会报错了,并且能完美运行。

这里写图片描述

运行结果图如下:

这里写图片描述

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐