Flink典型的任务处理过程如下所示:

Flink安装包下载地址:http://flink.apache.org/downloads.html  ,选择对应Hadoop的Flink版本下载

 Standalone 模式

快速入门教程地址:https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html

1.  软件要求

  • Java 1.8.x或更高版本,
  • ssh(必须运行sshd才能使用管理远程组件的Flink脚本)

集群部署规划

节点masterworkerzookeeper
mastermaster zookeeper
slave1 workerzookeeper
slave2 workerzookeeper
docker run  -p 50070:50070 -p 19888:19888 -p 8088:8088 -p 2181:2181 -p 16010:16010  -p 9092:9092 -9000:9000 --name  master  -ti -h master  linux:hadoop
docker run  -it -h slave1 --name slave1  linux:hadoop  /bin/bash
docker run  -it -h slave2 --name slave2  linux:hadoop  /bin/bash

2. 解压

tar zxvf flink-1.7.1-bin-hadoop27-scala_2.11.tgz

3. 修改配置文件

[root@master conf]$ ls
flink-conf.yaml       log4j-console.properties  log4j-yarn-session.properties  logback.xml       masters  sql-client-defaults.yaml
log4j-cli.properties  log4j.properties          logback-console.xml            logback-yarn.xml  slaves   zoo.cfg

修改flink-conf.yaml


taskmanager.numberOfTaskSlots:2
jobmanager.rpc.address:master

可选配置:

  • 每个JobManager(jobmanager.heap.mb)的可用内存量,
  • 每个TaskManager(taskmanager.heap.mb)的可用内存量,
  • 每台机器的可用CPU数量(taskmanager.numberOfTaskSlots),
  • 集群中的CPU总数(parallelism.default)和
  • 临时目录(taskmanager.tmp.dirs

3.1. HA配置文件

#jobmanager.rpc.address:master   #在master file中配置,由zookeeper选出leader与standby
high-availability:zookeeper                             #指定高可用模式(必须)
high-availability.zookeeper.quorum:master:2181,slave1:2181,slave2:2181  #ZooKeeper仲裁是ZooKeeper服务器的复制组,它提供分布式协调服务(必须)
high-availability.storageDir:hdfs:///flink/ha/       #JobManager元数据保存在文件系统storageDir中,只有指向此状态的指针存储在ZooKeeper中(必须)
high-availability.zookeeper.path.root:/flink         #根ZooKeeper节点,在该节点下放置所有集群节点(推荐) 
high-availability.cluster-id:/flinkCluster           #自定义集群(推荐)
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints

修改conf/zoo.cfg

server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888

修改conf/masters

master:8081

修改slaves

slave1
slave2

4. 启动Hadoop

hadoop集群,不做演示

[root@master /]# start-dfs.sh
19/01/11 06:35:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [master]
master: starting namenode, logging to /usr/local/hadoop/hadoop-2.8.3/logs/hadoop-root-namenode-master.out
slave1: starting datanode, logging to /usr/local/hadoop/hadoop-2.8.3/logs/hadoop-root-datanode-slave1.out
slave2: starting datanode, logging to /usr/local/hadoop/hadoop-2.8.3/logs/hadoop-root-datanode-slave2.out
master: starting datanode, logging to /usr/local/hadoop/hadoop-2.8.3/logs/hadoop-root-datanode-master.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop/hadoop-2.8.3/logs/hadoop-root-secondarynamenode-master.out
19/01/11 06:36:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

5. 启动Zookeeper

zookeeper集群,不做演示

[root@master /]# start-zookeeper-quorum.sh 
Starting zookeeper daemon on host master.
Starting zookeeper daemon on host slave1.
Starting zookeeper daemon on host slave2.

6. 启动flink

[root@master /]# start-cluster.sh 
Starting HA cluster with 1 masters.
Starting standalonesession daemon on host master.
Starting taskexecutor daemon on host slave1.
Starting taskexecutor daemon on host slave2.

webUI

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐