一、下载flink1.14.4
wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.11.tgz
注意:如果只使用flink_on_yarn,那么使用官网提供的二进制包即可,自flink1.11版本开始,flink支持通过hadoop classpath访问CDH集群中的hdfs和yarn。如果要使用flink sql_on_hive,那么使用官网提供的flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar会报错,需要重新进行编译。
编译的话只需要对flink-sql-connector-hive-2.2.0进行编译,不过在这里也介绍一下flink的编译方法。
1.下载flink1.14.4源码包
wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-src.tgz
2.配置编译环境
# 配置maven
tar zxf apache-maven-3.5.4-bin.tar.gz -C /usr/local
ln -s /usr/local/apache-maven-3.5.4/bin/mvn /usr/local/bin
3.修改flink的pom.xml
tar zxf flink-1.14.4-src.tgz
cd flink-1.14.4/
vim pom.xml
# 修改haddop版本
<hadoop.version>3.0.0-cdh6.3.2</hadoop.version>
# 修改hive版本
<hive.version>2.1.1-cdh6.3.2</hive.version>
<hivemetastore.hadoop.version>3.0.0-cdh6.3.2</hivemetastore.hadoop.version>
# 添加cloudera源
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
4.编译flink(参考即可,可不做)
# 编译后文件在flink-dist/target/flink-1.14.4-bin/flink-1.14.4
mvn clean install -DskipTests -Dfast -Drat.skip=true -Dhaoop.version=3.0.0-cdh6.3.2 -Dinclude-hadoop -Dscala-2.11 -T2C
注:编译flink过程中可能会提示io.confluent相关的包有问题,查询资料反馈是默认下载的包不完整,需删除已经下载的包,然后从https://packages.confluent.io/maven/io/confluent这个网站上下载对应的包放到maven repository目录
5.修改flink-sql-connector-hive-2.2.0的pom.xml
vim flink-connectors/flink-sql-connector-hive-2.2.0/pom.xml
# 修改hive-exec版本
<artifactId>hive-exec</artifactId>
<version>2.1.1-cdh6.3.2</version>
6.编译flink-sql-connector-hive-2.2.0(sql_on_hive需要做这步)
# 编译后文件在flink-connectors/flink-sql-connector-hive-2.2.0/target/
mvn clean install -DskipTests -Dscala-2.11 -T2C
7.拷贝相关jar包到flink/lib
# 拷贝flink-sql-connector-hive到flink的lib目录下
cp flink-connectors/flink-sql-connector-hive-2.2.0/target/flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar /data/flink-1.14.4/lib/
# 拷贝hive-exec-2.1.1-cdh6.3.2.jar、libfb303-0.9.3.jar
cp /opt/cloudera/parcels/CDH/jars/hive-exec-2.1.1-cdh6.3.2.jar /data/flink-1.14.4/lib/
cp /opt/cloudera/parcels/CDH/jars/libfb303-0.9.3.jar /data/flink-1.14.4/lib/
# 拷贝相关hadoop包
cp /opt/cloudera/parcels/CDH/jars/hadoop-common-3.0.0-cdh6.3.2.jar /data/flink-1.14.4/lib/
cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-common-3.0.0-cdh6.3.2.jar /data/flink-1.14.4/lib/
cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-core-3.0.0-cdh6.3.2.jar /data/flink-1.14.4/lib/
cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-hs-3.0.0-cdh6.3.2.jar /data/flink-1.14.4/lib/
cp /opt/cloudera/parcels/CDH/jars/hadoop-mapreduce-client-jobclient-3.0.0-cdh6.3.2.jar /data/flink-1.14.4/lib/
注:可选择拷贝上述的hadoop jar包,也可选择flink-shaded-hadoop-2-uber,flink自1.11版本起不建议在flink-shaded里包含hadoop包,所以要使用flink-shaded-hadoop-2-uber,需下载flink-shaded-10.0版本,然后编译。 最后flink/lib如下:
[root@node01 flink-1.14.4]# ll lib/
total 338876
-rw-r--r-- 1 root root 7685388 Apr 21 10:26 flink-connector-hive_2.11-1.14.4.jar
-rw-r--r-- 1 mysql 1001 85584 Feb 25 21:03 flink-csv-1.14.4.jar
-rw-r--r-- 1 mysql 1001 143667725 Feb 25 21:06 flink-dist_2.11-1.14.4.jar
-rw-r--r-- 1 mysql 1001 153145 Feb 25 21:03 flink-json-1.14.4.jar
-rw-r--r-- 1 root root 59427306 Apr 21 10:56 flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.2-10.0.jar
-rw-r--r-- 1 mysql 1001 7709731 Sep 10 2021 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root root 43784570 Apr 19 19:34 flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar
-rw-r--r-- 1 root root 3704559 Apr 19 19:34 flink-sql-connector-kafka_2.11-1.14.4.jar
-rw-r--r-- 1 mysql 1001 42314450 Feb 25 21:05 flink-table_2.11-1.14.4.jar
-rw-r--r-- 1 root root 35803898 Apr 19 19:34 hive-exec-2.1.1-cdh6.3.2.jar
-rw-r--r-- 1 root root 313702 Apr 19 19:35 libfb303-0.9.3.jar
-rw-r--r-- 1 mysql 1001 208006 Jan 13 19:06 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 mysql 1001 301872 Jan 7 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 mysql 1001 1790452 Jan 7 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 mysql 1001 24279 Jan 7 18:07 log4j-slf4j-impl-2.17.1.jar
二、制作Flink的parcel包和csd文件
1. 下载制作脚本
git clone https://github.com/YUjichang/flink-parcel.git
2. 修改flink二进制包名称
cm_ext会对parcel名称进行验证,需要修改为flink-${FLINK_VERSION}-${EXTENS_VERSION}格式
mv flink-1.14.4-bin-scala_2.11.tgz flink-1.14.4-cdh6.3.2.tgz
3. 修改flink-parcel.properties
vim flink-parcel.properties
#FLINk 存放目录地址
FLINK_URL= /root/flink-1.14.4-cdh6.3.2.tgz
#flink版本号
FLINK_VERSION=1.14.4
#扩展版本号
EXTENS_VERSION=CDH6.3.2
#操作系统版本,以centos为例
OS_VERSION=7
#CDH 小版本
CDH_MIN_FULL=6.0
CDH_MAX_FULL=6.4
#CDH大版本
CDH_MIN=5
CDH_MAX=6
4.运行 build.sh脚本
sh build.sh parcel
sh build.sh csd
编译完成后,生成flink的parcel、manifest、csd文件
[root@node01 flink-parcel]# ll FLINK-1.14.4-CDH6.3.2_build/ FLINK_ON_YARN-1.14.4.jar
-rw-r--r-- 1 root root 20972 Apr 20 18:25 FLINK_ON_YARN-1.14.4.jar
FLINK-1.14.4-CDH6.3.2_build/:
total 432716
-rw-r--r-- 1 root root 443091157 Apr 20 18:29 FLINK-1.14.4-CDH6.3.2-el7.parcel
-rw-r--r-- 1 root root 40 Apr 20 18:29 FLINK-1.14.4-CDH6.3.2-el7.parcel.sha
-rw-r--r-- 1 root root 840 Apr 20 18:29 manifest.json
三、CM添加Flink服务
1. 拷贝csd文件
将制作出来的csd文件拷贝到cloudera里,并重启cloudera-scm-server
cp FLINK_ON_YARN-1.14.4.jar /opt/cloudera/csd/
systemctl restart cloudera-scm-server
2. 分配和激活parcel包
3. 添加Flink服务
在cm集群上,添加Flink服务
四、验证Flink服务
1、运行一个WordCount测试
sudo -u flink flink run -t yarn-per-job /opt/cloudera/parcels/FLINK/lib/flink/examples/batch/WordCount.jar
2、运行yarn-session
sudo -u flink flink-yarn-session -jm 1024 -tm 1024 --detached
# 查看yarn
[root@node01 opt]# yarn application -list
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
22/04/21 17:39:01 INFO client.RMProxy: Connecting to ResourceManager at node01/192.168.100.200:8032
Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
application_1650533003237_0005 Flink session cluster Apache Flink flink root.users.flink RUNNING UNDEFINED 100% http://node03:8080
[root@node01 opt]#
3、运行一个flink sql 查询hive
flink-sql-client
# 创建hivecatalog
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/etc/hive/conf'
);
USE CATALOG myhive;
SET execution.runtime-mode = batch;
SET sql-client.execution.result-mode = tableau;
SET table.sql-dialect = hive;
# 查询hive
select count(*) from dm_ods_db_dev.o_01_member_personal_info;