一、先介绍下过程
利用flink-operator 实现flink on k8s
1、在k8s 上部署 flink-kubernetes-operator
2、写代码打包
3、打包镜像push到harbor
4、提交任务
4、flink-ui

二、按步就班
1、在k8s 上部署 flink-kubernetes-operator

参考官网
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/
选择 合适版本的flink-kubernetes-operator 和flink
(flink-kubernetes-operator-release-1.6.1.tar.gz   flink 1.17.2)
 helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.1/
 helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --namespace flink --create-namespace

在对应的namespace 下会生成一个Deployment
在这里插入图片描述2、写代码打包

package XXX.XXXX;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Main {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT, 8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        env.enableCheckpointing(3000);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        String sql = "CREATE TABLE products (\n" +
                "    id INT,\n" +
                "    name STRING,\n" +
                "    description STRING,\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                "  ) WITH (\n" +
                "   'server-time-zone' = 'Asia/Shanghai'," +
                "    'connector' = 'mysql-cdc',\n" +
                "    'hostname' = 'ip',\n" +
                "    'port' = '3306',\n" +
                "    'username' = 'xxx',\n" +
                "    'password' = 'xxx',\n" +
                "    'database-name' = 'mydb',\n" +
                "    'table-name' = 'products'\n" +
                "  )";
        tableEnv.executeSql(sql);
        String sql2 = "CREATE TABLE orders (\n" +
                "   order_id INT,\n" +
                "   order_date TIMESTAMP(0),\n" +
                "   customer_name STRING,\n" +
                "   price DECIMAL(10, 5),\n" +
                "   product_id INT,\n" +
                "   order_status BOOLEAN,\n" +
                "   PRIMARY KEY (order_id) NOT ENFORCED\n" +
                " ) WITH (\n" +
                "   'server-time-zone' = 'Asia/Shanghai'," +
                "   'connector' = 'mysql-cdc',\n" +
                "   'hostname' = 'ip',\n" +
                "   'port' = '3306',\n" +
                "   'username' = 'xxx',\n" +
                "   'password' = 'xxx',\n" +
                "   'database-name' = 'mydb',\n" +
                "   'table-name' = 'orders'\n" +
                " )";
        tableEnv.executeSql(sql2);
        String sql3 = "CREATE TABLE shipments (\n" +
                "   shipment_id INT,\n" +
                "   order_id INT,\n" +
                "   origin STRING,\n" +
                "   destination STRING,\n" +
                "   is_arrived BOOLEAN,\n" +
                "   PRIMARY KEY (shipment_id) NOT ENFORCED\n" +
                " ) WITH (\n" +
                "   'server-time-zone' = 'Asia/Shanghai'," +
                "   'connector' = 'mysql-cdc',\n" +
                "   'hostname' = 'ip',\n" +
                "   'port' = '3306',\n" +
                "   'username' = 'xxx',\n" +
                "   'password' = 'xxx',\n" +
                "   'database-name' = 'mydb',\n" +
                "   'table-name' = 'shipments'\n" +
                " )";
        tableEnv.executeSql(sql3);
        String sqlES = " CREATE TABLE enriched_orders (\n" +
                "   order_id INT,\n" +
                "   order_date TIMESTAMP(0),\n" +
                "   customer_name STRING,\n" +
                "   price DECIMAL(10, 5),\n" +
                "   product_id INT,\n" +
                "   order_status BOOLEAN,\n" +
                "   product_name STRING,\n" +
                "   product_description STRING,\n" +
                "   shipment_id INT,\n" +
                "   origin STRING,\n" +
                "   destination STRING,\n" +
                "   is_arrived BOOLEAN,\n" +
                "   PRIMARY KEY (order_id) NOT ENFORCED\n" +
                " ) WITH (\n" +
                "  'connector' = 'jdbc', " +
                "   'username' = 'xxx',\n" +
                "   'password' = 'xxx',\n" +
                "   'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "   'url' = 'jdbc:mysql://ip:3306/mydb',\n" +
                "   'table-name' = 'enriched_orders'  " +
                " )";

        tableEnv.executeSql(sqlES);


        tableEnv.executeSql("INSERT INTO enriched_orders\n" +
                " SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived\n" +
                " FROM orders AS o\n" +
                " LEFT JOIN products AS p ON o.product_id = p.id\n" +
                " LEFT JOIN shipments AS s ON o.order_id = s.order_id");

    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flinkdemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.2</flink.version>
        <scala.version>2.12</scala.version>
        <jdk.version>11</jdk.version>
    </properties>
    <dependencies>
    <!--    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-elasticsearch7</artifactId>
            <version>3.0.1-1.17</version>
&lt;!&ndash;            <scope>provided</scope>&ndash;&gt;
        </dependency>-->
        <!--mysql cdc -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
   <!--     <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.0-1.17</version>
        </dependency>

    <!--    <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>2.0.1</version>
            &lt;!&ndash;        <scope>test</scope>&ndash;&gt;
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>-->


 <!--       <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.20.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>-->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.26</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>org.example.Main</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
             <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>
                                ${project.build.directory}/lib
                            </outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

3、打包镜像push到harbor
1)打包

IDEAJ maven package 
需要的jar
target/xxxxx-with-dependencies.jar
lib/目录下的
mysql-connector-java-8.0.28.jar
flink-connector-mysql-cdc-2.4.1.jar
flink-connector-jdbc-3.1.0-1.17.jar

2)编写Dockerfile 创建镜像

FROM flink:1.17.0
WORKDIR /opt/flink
COPY flinkdemo-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/flink/flinkdemo-1.0-SNAPSHOT-jar-with-dependencies.jar
COPY mysql-connector-java-8.0.28.jar /opt/flink/lib/mysql-connector-java-8.0.28.jar
COPY lib/*.jar  /opt/flink/lib/
ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081
CMD ["help"]

3)放到带有docker的服务器上如下,lib文件夹下是
mysql-connector-java-8.0.28.jar
flink-connector-mysql-cdc-2.4.1.jar
flink-connector-jdbc-3.1.0-1.17.jar
在这里插入图片描述

4)镜像推送到harbor

docker build -f Dockerfile -t imageName:TAG .
docker tag  imageName:TAG harborIP/仓库/imageName:TAG
docker push  harborIP/仓库/imageName:TAG

4、提交任务
1)k8s上能执行kubectl 命令的节点上创建自定义工作负载配置任务文件 flink-cdc-demo.yaml

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink
  name: application-deployment
spec:  
  image: harborIP/仓库/imageName:TAG
  flinkVersion: v1_17
  imagePullPolicy: IfNotPresent  
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/xxxxx.jar 任务jar 在镜像中的位置,好像是只支持local模式
    entryClass: 任务代码的主类main函数类全路径
    args:
    parallelism: 1
    upgradeMode: stateless

2)kubectl执行yaml文件创建任务

 kubectl apply -f flink-cdc-demo.yaml
 
 删除作业
 kubectl delete FlinkDeployment application-deployment -n flink
 或者
 kubectl delete -f flink-cdc-demo.yaml


3)会根据yaml 创建 namespace:flink serviceaccount:flink
4)任务创建成功会出现一个Deployment,和两个Service
在这里插入图片描述
在这里插入图片描述

5、flink-web-ui
1)新建一个nodeport类型的service ,选择器和application-deployment-rest 保持一致,映射8081端口在这里插入图片描述
2) 访问flink-web-ui service 对应的端口即可出现flink-web-ui界面

6、可能遇到的问题和解决
1)yaml配置镜像地址不支持配置harbor 账号密码

创建Secret
kubectl create secret docker-registry 秘钥名称\ 
 --docker-server=服务地址 \
 --docker-username=用户名 \
 --docker-password=用户密码 \
 -n 创建到那个命名空间
 
示例: 
kubectl create secret docker-registry harbor\
 --docker-server=harborIP\
 --docker-username=xxx\
 --docker-password=xxx\
 -n flink

给flink 账号绑定harbor账号密码
kubectl patch serviceaccount flink --namespace=flink  -p '{"imagePullSecrets": [{"name": "harbor"}]}'

2)flink jar包冲突和缺失

先利用maven helper解决jar冲突,然后再根据启动日志解决jar冲突问题

3)其他问题


ValidationException: The MySQL server has a timezone offset (28800 seconds ahead of UTC) which does 

flink cdc 由mysql往flink table表里面同步数据时报上面错,是由于flink table创建时数据库服务器中的会话时区设置的不对。

配置上'server-time-zone' = 'Asia/Shanghai'  即可(参考https://blog.csdn.net/zhglhy/article/details/133931964)
示例
CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'server-time-zone' = 'Asia/Shanghai',
    'connector' = 'mysql-cdc',
    'hostname' = 'xxx.xxx.xxx.xxx',
    'port' = '3306',
    'username' = 'xxx',
    'password' = 'xxx',
    'database-name' = 'cdc',
    'table-name' = 'products'
  );
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'jdbc'.
Unsupported options:
server-time-zone
Supported options:
connection.max-retry-timeout
connector
driver
lookup.cache
lookup.cache.caching-missing-key
lookup.cache.max-rows
lookup.cache.ttl
lookup.max-retries
lookup.partial-cache.cache-missing-key
lookup.partial-cache.expire-after-access
lookup.partial-cache.expire-after-write
lookup.partial-cache.max-rows
password
property-version
scan.auto-commit
scan.fetch-size
scan.partition.column
scan.partition.lower-bound
scan.partition.num
scan.partition.upper-bound
sink.buffer-flush.interval
sink.buffer-flush.max-rows
sink.max-retries
sink.parallelism
table-name
url
username

FINK SQL时的数据参数有问题,此问题是mysql connector 不支持 server-time-zone参数

Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'

缺少依赖jar
flink-connector-jdbc-3.1.0-1.17.jar
mysql-connector-java-8.0.28.jar
flink-connector-mysql-cdc-2.4.1.jar  添加到 flink根目录/lib下
NoClassDefFoundError: com/ververica/cdc/debezium/utils/ResolvedSchemaUtils

参考https://github.com/apache/flink-cdc/issues/1832
用flink-sql-connector-mysql-cdc-2.4.1.jar 替换 flink-connector-mysql-cdc-2.4.1.jar
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐