一、下载flink1.14.4

wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.11.tgz

注意:如果只使用flink_on_yarn,那么使用官网提供的二进制包即可,自flink1.11版本开始,flink支持通过hadoop classpath访问CDH集群中的hdfs和yarn。如果要使用flink sql_on_hive,那么使用官网提供的flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar会报错,需要重新进行编译。

编译的话只需要对flink-sql-connector-hive-2.2.0进行编译,不过在这里也介绍一下flink的编译方法。

1.下载flink1.14.4源码包

wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-src.tgz

2.配置编译环境

# 配置maven
tar zxf apache-maven-3.5.4-bin.tar.gz -C /usr/local
ln -s  /usr/local/apache-maven-3.5.4/bin/mvn /usr/local/bin

3.修改flink的pom.xml

tar zxf flink-1.14.4-src.tgz
cd flink-1.14.4/
vim pom.xml
    # 修改haddop版本
    <hadoop.version>3.0.0-cdh6.3.2</hadoop.version>

    # 修改hive版本
    <hive.version>2.1.1-cdh6.3.2</hive.version>
    <hivemetastore.hadoop.version>3.0.0-cdh6.3.2</hivemetastore.hadoop.version>

    # 添加cloudera源
    <repositories>
      <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
      </repository>
    </repositories>

4.编译flink(参考即可,可不做)

# 编译后文件在flink-dist/target/flink-1.14.4-bin/flink-1.14.4
mvn clean install -DskipTests -Dfast -Drat.skip=true -Dhaoop.version=3.0.0-cdh6.3.2 -Dinclude-hadoop -Dscala-2.11 -T2C

注:编译flink过程中可能会提示io.confluent相关的包有问题,查询资料反馈是默认下载的包不完整,需删除已经下载的包,然后从https://packages.confluent.io/maven/io/confluent这个网站上下载对应的包放到maven repository目录

5.修改flink-sql-connector-hive-2.2.0的pom.xml

vim flink-connectors/flink-sql-connector-hive-2.2.0/pom.xml
  # 修改hive-exec版本
  <artifactId>hive-exec</artifactId>
  <version>2.1.1-cdh6.3.2</version>

6.编译flink-sql-connector-hive-2.2.0(sql_on_hive需要做这步)

# 编译后文件在flink-connectors/flink-sql-connector-hive-2.2.0/target/
mvn clean install -DskipTests -Dscala-2.11 -T2C

7.拷贝相关jar包到flink/lib

# 拷贝flink-sql-connector-hive到flink的lib目录下
cp flink-connectors/flink-sql-connector-hive-2.2.0/target/flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar /data/flink-1.14.4/lib/

# 拷贝hive-exec-2.1.1-cdh6.3.2.jar、libfb303-0.9.3.jar
cp /opt/cloudera/parcels/CDH/jars/hive-exec-2.1.1-cdh6.3.2.jar /data/flink-1.14.4/lib/
cp /opt/cloudera/parcels/CDH/jars/libfb303-0.9.3.jar /data/flink-1.14.4/lib/

# 拷贝相关hadoop包
cp /opt/cloudera/parcels/CDH/jars/hadoop-common-3.0.0-cdh6.3.2.jar /data/flink-1.14.4/lib/
cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-common-3.0.0-cdh6.3.2.jar /data/flink-1.14.4/lib/
cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-core-3.0.0-cdh6.3.2.jar /data/flink-1.14.4/lib/
cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-hs-3.0.0-cdh6.3.2.jar /data/flink-1.14.4/lib/
cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-jobclient-3.0.0-cdh6.3.2.jar /data/flink-1.14.4/lib/

注:可选择拷贝上述的hadoop jar包,也可选择flink-shaded-hadoop-2-uber,flink自1.11版本起不建议在flink-shaded里包含hadoop包,所以要使用flink-shaded-hadoop-2-uber,需下载flink-shaded-10.0版本,然后编译。 最后flink/lib如下:

[root@node01 flink-1.14.4]# ll lib/
total 338876
-rw-r--r-- 1 root  root   7685388 Apr 21 10:26 flink-connector-hive_2.11-1.14.4.jar
-rw-r--r-- 1 mysql 1001     85584 Feb 25 21:03 flink-csv-1.14.4.jar
-rw-r--r-- 1 mysql 1001 143667725 Feb 25 21:06 flink-dist_2.11-1.14.4.jar
-rw-r--r-- 1 mysql 1001    153145 Feb 25 21:03 flink-json-1.14.4.jar
-rw-r--r-- 1 root  root  59427306 Apr 21 10:56 flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.2-10.0.jar
-rw-r--r-- 1 mysql 1001   7709731 Sep 10  2021 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root  root  43784570 Apr 19 19:34 flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar
-rw-r--r-- 1 root  root   3704559 Apr 19 19:34 flink-sql-connector-kafka_2.11-1.14.4.jar
-rw-r--r-- 1 mysql 1001  42314450 Feb 25 21:05 flink-table_2.11-1.14.4.jar
-rw-r--r-- 1 root  root  35803898 Apr 19 19:34 hive-exec-2.1.1-cdh6.3.2.jar
-rw-r--r-- 1 root  root    313702 Apr 19 19:35 libfb303-0.9.3.jar
-rw-r--r-- 1 mysql 1001    208006 Jan 13 19:06 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 mysql 1001    301872 Jan  7 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 mysql 1001   1790452 Jan  7 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 mysql 1001     24279 Jan  7 18:07 log4j-slf4j-impl-2.17.1.jar

二、制作Flink的parcel包和csd文件

1. 下载制作脚本

git clone https://github.com/YUjichang/flink-parcel.git

2. 修改flink二进制包名称

cm_ext会对parcel名称进行验证,需要修改为flink-${FLINK_VERSION}-${EXTENS_VERSION}格式

mv flink-1.14.4-bin-scala_2.11.tgz flink-1.14.4-cdh6.3.2.tgz

3. 修改flink-parcel.properties

vim flink-parcel.properties
#FLINk 存放目录地址
FLINK_URL=  /root/flink-1.14.4-cdh6.3.2.tgz

#flink版本号
FLINK_VERSION=1.14.4

#扩展版本号
EXTENS_VERSION=CDH6.3.2

#操作系统版本,以centos为例
OS_VERSION=7

#CDH 小版本
CDH_MIN_FULL=6.0
CDH_MAX_FULL=6.4

#CDH大版本
CDH_MIN=5
CDH_MAX=6

4.运行 build.sh脚本

sh build.sh parcel
sh build.sh csd

编译完成后,生成flink的parcel、manifest、csd文件

[root@node01 flink-parcel]# ll FLINK-1.14.4-CDH6.3.2_build/ FLINK_ON_YARN-1.14.4.jar
-rw-r--r-- 1 root root 20972 Apr 20 18:25 FLINK_ON_YARN-1.14.4.jar

FLINK-1.14.4-CDH6.3.2_build/:
total 432716
-rw-r--r-- 1 root root 443091157 Apr 20 18:29 FLINK-1.14.4-CDH6.3.2-el7.parcel
-rw-r--r-- 1 root root        40 Apr 20 18:29 FLINK-1.14.4-CDH6.3.2-el7.parcel.sha
-rw-r--r-- 1 root root       840 Apr 20 18:29 manifest.json

三、CM添加Flink服务

1. 拷贝csd文件

将制作出来的csd文件拷贝到cloudera里,并重启cloudera-scm-server

cp FLINK_ON_YARN-1.14.4.jar /opt/cloudera/csd/
systemctl restart cloudera-scm-server

2. 分配和激活parcel包

3. 添加Flink服务

在cm集群上,添加Flink服务

四、验证Flink服务

1、运行一个WordCount测试

sudo -u flink flink run -t yarn-per-job /opt/cloudera/parcels/FLINK/lib/flink/examples/batch/WordCount.jar

2、运行yarn-session

sudo -u flink flink-yarn-session -jm 1024 -tm 1024 --detached

# 查看yarn
[root@node01 opt]# yarn application -list
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
22/04/21 17:39:01 INFO client.RMProxy: Connecting to ResourceManager at node01/192.168.100.200:8032
Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1
                Application-Id      Application-Name        Application-Type          User       Queue               State         Final-State         Progress                        Tracking-URL
application_1650533003237_0005  Flink session cluster           Apache Flink         flink  root.users.flink               RUNNING           UNDEFINED             100%                  http://node03:8080
[root@node01 opt]# 

3、运行一个flink sql 查询hive

flink-sql-client

  # 创建hivecatalog
  CREATE CATALOG myhive WITH (
      'type' = 'hive',
      'default-database' = 'default',
      'hive-conf-dir' = '/etc/hive/conf'
  );
  USE CATALOG myhive;
  SET execution.runtime-mode = batch;
  SET sql-client.execution.result-mode = tableau;
  SET table.sql-dialect = hive;

  # 查询hive
  select count(*) from dm_ods_db_dev.o_01_member_personal_info;