开发者学堂课程【NiFi 知识精讲与项目实战(第三阶段):实时同步 Mysql 到 Hive-3】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/707/detail/12551
实时同步 Mysql 到 Hive-3
内容介绍:
一、实际操作过程演示
一、实际操作过程演示
首先创建一个处理器组,名称为 MysqlToHive. Timely。
处理器组创建完毕后接入进来
1.监听 Mysql
通过 CaptureChangeMysql
创建完成以后进行配置,配置监听的时候,一定要有一个缓存服务器,所以先来创建缓存一共有两种方式,第一种是在 distribute map cache client 进行创建,第二种方式是在面板当中单击右键,点击 configure 的Controller serves,点击加号进行添加。
添加完成以后来进行设置,三个配置项都有默认值,这几个默认值直接使用就可以,不需要再进行变更。
再来添加客户端 Distributedmapcacheclient,添加完成以后,在 state 下面显示 invalid,表明状态有问题,进行设置配置项,设置缓存服务端 Server 的 IP 地址,直接写 nifi 服务器 IP 地址即可。端口号和 server 的默认值是保持一致的,点击保存。保存后感叹号就没有了。
接下来进行启动,先启动 server,再启动 client,启动成功以后,配置到 capture mysql 组件当中,Distributed map cache client,选择 distribute mapcacheclientservice,在进行其它的配置。Mysql host 是数据库链接,直接复制 IP 地址和端口号即可,驱动的配置不需要更改,Mysql driver location 是夹包所在的目录,复制目录/export/download/jars/mysql -connector-java-5. 1. 40. jar,如果设置的目录和上面路径不一致,需要进行更改。随后设置 MySQL 的账号和密码,配置完成以后,点击 apply,如果直接启动的话是会报错的。现在还不能启动因为还没有链接。没有链接可以去创造一个链接,接下来的组件是 route,选择 routeonattribute,把capture changemysql和routeonattribute 连接起来,尝试去启动查看配置是否有效。
启动以后发现它报错,停止一下查看服务器负载,可以看到 CPU 突然就出现了飙满状态,CPU 不仅飙满还出现了报错,飙满的原因是因为处在 json 模式下,第一个节点必须要设置调度,如果不设置调度的话,就会不断的循环去执行,可以设置时间不那么快,可以把 run schedule 设置为3 sec,设置为每三秒去查询一次可以减轻服务器压力,接下来看一下报错信息。
通过时间来查看是否报错,第一个出发的时间是凌晨4点22分,但是现在的时间是4点24分,可以看到是没有新的错误信息一直在4点22分。目前看来是处于正常状态,不会出现报错了。
接下来配置 remote 即路由,根据抓取到的信息进行路由,需要通过 jasonpath 表达式进行路由,使用 jasonpath 表达式就必须要使用数据和属性进行路由,现在发现面板上没有数据进来,可以通过触发产生数据,可以在数据库中对数据库表插入一条数据,向 nifi_hive 表中插入一条数据,现在 nifi_hive 表中是没有数据的。接下来通过 SQL 语句来插入数据,刷新一下就可以查看到监听到的数据,已经有消息进入到队列。
鼠标右键点击 list quete 查看数据,点击 view,可以看到已经出现了数据。type 是 insert,如果想要进行路由,是根据属性去获取,在 flow file 里面的 attributes 查看到 cdc.event.type,这属性就标识了是 insert 还是 update 或者是一些其它的操作。鼠标右键点击复制,如果删除的话不需要管,只负责新增以及更新操作,在 configure processor 里面的,Properties 进行配置,通过添加一个自定义属性进行配置,属性名称可以定义为 insert,属性通过 nifi 表达式获取值,值等于 insert 字符串才可以定义为 insert,通过 equals 函数传入入参,入参是一个字符串,需要使用引号括起来,s[cdc. event. type:equals(' insert')如果表达式成立为 TRUE,那么认为就是 insert 类型。再定义一个update,s[edc. event. type:equa1s(' update’)]这些操作都是更新的操作,删除的操作不需要捕获,直接打印错误消息里面就可以。
设计完成以后可以来验证一下路由转发是否可以。可以通过提供下游处理器队列处理来进行验证,下游应该使用evaluateJasonpath处理器,用这处理器来提取 flow file 当中的属性,用来做拼接 SQL 去使用,首先,测试routeonattribute 能否成功路由给 evaluateJasonpath,for Relationships 下面选择 insert 和 update,这两个关系就是刚才所添加的属性,它们是相同的,如果符合 insert 自定义类型的话,都会发送到关联关系当中,只要这两个里面任何一个满足,就都让它们发送到 evaluateJasonPath 中去执行更新。剩下还有一个没有匹配成功的,可以让它发送到 log 日志里面。
接下来进行 evaluateJasonPath 的配置,它主要的目的就是把拿到的 jason 数据里面内容提取到属性当中,通过测试数据来查看数据的形式。运行 route,队列正确的发送到了提取处理业务队列当中,错误队列没有任何数据为零。停止一下来查看里面的内容,里面有一个数据来查看一下信息,insert 类型已经进来了,里面的内容也没有产生任何的变化,仍然是原始数据,这里面只是进行了简单的路由转发工作,没有更改任何信息。如果需要更加的实时,可以把更新的时间缩短,缩短为30毫秒等,但是这对服务器有一定的要求,如果只是在自己电脑上使用虚拟机那注意不要配置的时间太频繁。通过设置 evaluateJasonPath 来提取 Jason 属性到自定义 flow file 中,这里面就可以使用到jason path 表达式,Jasonpath 表达式去提取 jason 的内容,首先要拿到 jason 的信息,把 jasonz 的信息复制出来,点击 view,复制数据进行格式化,这样看起来更加清晰。打开 jasonviewr 进行格式化和粘贴。
"type" : "insert" ,
"timestamp" : 1582748801000,
"binlog_filename" :
"mysql-bin.000006",
"binlog_position" : 596,
"database" : "nifi_test" ,
"table_name" : "nifi_hive_streaming" ,
"table_id" : 109,
" columns" : [
{
"id": 1,
"name" : "id",
"column_type" : 4,
"value": 1
}
id": 2,
"name" : "name" ,
" column_type" : 12,
"value":"testName5"
},
{ "id" : 3,
"name" : "day_time" ,
"column_type" : 91,
"value" :“2020-02-27"
}
}
需要获取值和字段的信息数据,id、name、时间的信息,这些数据获取到以后,才可以进行数据的替换,把数据替换到 hive 之中,接下来去做准备工作,创建hive表执行之前一定要进行准备工作,如果忘记做准备工作,后面的流程就会走不通。
输入代码SELECT * from myhive.nifi_hive;现在表里面是没有数据的,如果是 insert 它的 SQL 语句和 MySQL是类似的,可以进行复制insert into nifi_test.nifi_hive_streaming (name,day_time) values ( 'testName5 ", now());表名改为 hive 中的表名,即为insert into myhive.nifi_hive(id,name,day_time) values ( );这表里面有三个字段分别为 ID、name和 day_time,view 数据替换为数据流里面的数据,需要在组件中去获取数值,进行设置。如果想要获取 database,需要用 jason path 根节点$,分解点下一级的属性通过.或者[]进行获取,首先设置 property name 为 database,值设置为$.database,这样就获取到了 jason 中的属性值。接下来,如果想要获取表名,Property name 设置为table_name,值为$.table_name,关键点在于 ID、name 和 daytime 需要拿到,否则无法插入,或许这三个值就涉及到了数组的问题,因为 columns 是一个数组,首先来获取 columns,比如说要获取 ID 值,输入$.columns,id 是在第一项当中,值为 value 也就是1,数据库当中显示也为一
Name 和 daytime 都是匹配的,如果需要获取 ID 值,就要拿数组中的第一个对象 coLumns:
"id" : 1,
name" : "id",
"column_type": 4,
"value": 1,
下标为0,设置值为$.columns[0],拿到0后获取的是整个的对象,但是想要获取的是它的值,它的值是 value,所以最终设置值为$.columns[0].value,通过这样的方式,就可以拿到 ID 的值。 继续添加 name 字段的值,输入 value 值为$.columns[1].value,Value 值为 testname5,接下来是 daytime,同样输入$.columns[2].value,这样就获取到了 ID、name 和 day_time 的值 ,点击保存。
接下来测试一下数据是否正确,试验也需要有一个队列进行发送,把 evaluatejasonpath 的队列指向 log attribute 进行查看,For relationships 选择 matched、Future 和 unmatched,Evaluate jason path 有一个报错
报错信息提示的是 flowflie-content,现在要把它提取到属性当中,所以需要把 destination 修改为 flowflie-attribute,这样就没有报错信息了。接下来进行 run start 和 refresh,然后查看队列内容 list quote,在 view 进行查看,代码显示为00binlog titnane' 'msal-ino.0s "bialgupoutin ".1% atioun ' if i tat ts e nas ' f five tran thueir :1 " sumf'!f d ans d urmntys hes : t as a,Content并没有变化。接下来看一下attributes,里面已经有了 database,MySQL 数据库就是 nifi_test,表名 table_name_streaming,Name 字段为test name 5,ID的值为1,还有时间字段day_time为2020-02-27, 这些数据都正确的写入到了 flow file 里面。
接下来就是如何使用提取到的属性转换成 SQL 语句,使用 replacetext 进行转换,对处理器进行链接,For relationship 选择 matched,失败会发送到 log 日志里面,发送到 replace 以后,Replace 会使用 flow file 当中数据,拼接成一个 SQL 语句。
$1是原始的 Content 内容,要把它进行修改,使用 nifi 表达式,数值不能直接写,要上传 SQL 语句,直接复制 SQL语句,然后把数值填写进来,最后填写应该为hive.nifi_hive (id, name,day_time) values ($(id), $ (name),$day_time)),点击 apply。这时候 replace text 也配置好了。配置好以后要去看一下它执行出来的 SQL 语句是否正确,让它也去运行,For relationships 选择 failture 和 success
接下来如果想继续运转,发现 evaluatejasonpath 里面有报错,选择自连接,
有两种造数据的方式,第一种方式是通过启动前面的处理器来造数据,第二种方法是通过,Data provenance
点击 content 中的 replay,点击以后发现它提示成功,发现数据已经有了队列,重新复现流程,这也是使用的技巧,接下来运行 replace,停止队列处理器,查看 list quete,查看数据的内容 view,可以看到,已经变成了 insert 语句,代码如下insert into mythive.nifi_hive (id,name,day_time) values (l, testName5,2020-02-27 ),
再来看一下 insert 语句是否正确,数据是没有错的,在表当中有三个字段,其中 ID 是 int,name 和 daytime 都是string类型,在 MySQL 里面,ID 是 int,name 为 string,datetime 是 date,通过把 date 类型转换为 string 进行存储,Int 类型是可以这样写,但是 SQL 语句的 string 类型是不可以这样进行写的,String 类型这样写是会报错的,需要加引号,接下来进行配置,在 property 里面,在最终的输出数据里面给 name 增加单引号。
然后再测试一下,重新复现数据,点击 view 查看数据没有发生变化,然后点击 content 里面的 replay
Replay 以后数据就进来了,然后再进行启动,写入成功后停止,再查看队列,来看最新的队列信息,点击 view,代码显示为insert into myhive.nifi_hive (id,,name,day_time) values (1,ltestName5' , '2020-02-27’),这样数据是正确的,在 hive 中测试 SQL 语句是否能够正常运行,过程会比较缓慢,因为 hive 的硬盘操作比较多,写数据和读数据的操作会比较慢。SQL 语句执行完毕以后,查看数据是否写入成功,编写代码SELECT * from myhive.nifi_hive;
可以看到数据已经成功写入。这样就证明 SQL 语句是没有问题,到目前为止 SQL 语句已经成功转化,然后清空队列。SQL 语句写入 hive 需要通过 Puthive,
进来以后把 Replacetext 和 PutHiveQL 做一个链接,如果失败就需要做一个自连接
接下来点击 configure,之前提到 PuthiveQL 需要数据库连接池才能工作。通过 create,创建一个 hive 连接池
如果设置了两个连接池,需要删除一个。接下来设置配置,Database connection URL 设置地址为jdbc:hive2:/ /192.168.52.120:10000],如果 hive 的端口号和 ID 不一致要改成自己的地址。hive configation resources 设置为 Hive configuration Resources = /export/download/config/core-site.xml , / export/download/config/hdfs-site.xml, /export/download/config/hive-site.xml,由于 hive site 没有上传过,需要去上传一下,输入以下代码
configjarsnifi-1.9.2nifi-1.9.2-bin.tar.gz[ root@localhost
download]# cd jars/
[root@localhost jars]# ls
mysql-connector-java-5.i.40.jar
[root@localhost jars]# pwd
export/download/jars
[root@localhost jars]#_cd ../config/
[ root@localhost config]n ls
core-site. xmlhdfs-site.xml
[root@localhost config]# ls
core-site.xmlhdfs-site.xmlhive-site. Xml
[root@localhost config]#
[root@localhost config]#
iroot@localhost config]# chmod +wrx h
chmod:无法访问"h" :没有那个文件或目录
[ root@localhost config]n chmod +wrx hive-site.
xml[root@localhost configln ls.
core-site.xmlhdfs-site.xmlhive-site.xml[root@localhost config]#
三个配置文件都有了以后,账号名和密码之前没有设置,所以就不需要再设置。点击 enable 进行启动,重新回到put hive ql,进行关联。如果想验证 put hive ql,需要把它启动起来,查看数据库是否报错,或者有没有写入到 hive表中去,Hive 的 SQL 语句,已经执行过了,里面只有一条数据。配置一下 put hive ql 的自连接
然后重新启动程序,启动之前要检查一下整个程序。Capture change mysql 是单点启动,每隔三秒钟去监听一次变化,第一个程序是单点启动,后面的是负载均衡,
想要达到负载均衡,需要在节点进行设置,在 settings 下面的 load balance strategy 选择 round robin,三台机群都可以进行处理。数据运行以后发现没有数据,因为没有执行新的 my sql 变更,这时需要再插入数据,然后回到 nifi中,发现在 puthiveql 程序上出现了报错
停止查看具体的报错信息,CPU 处理又飙的很高,也是因为是集群模式,而是因为配置的很低,第三是查询数据库,每三秒查一次,压力比较大。报错信息出现的主要原因还是准备工作的问题,准备工作当中提到,最后需要替换 nar包,新版本的 hive 不再支持老版本的 nar 包,所以需要进行替换,在 nifi 安装包里面有新的 nar 包,需要进行上传到夹包目录下
[ root@localhost config]# chmod +wrx hive-site. Xml
[ root@localhost config]# ls
core-site. xmlhdfs-site.xmlhive-site.xml
[ root@localhost config]#_cd ..l
[ root@localhost download]# ls. .config
jarsnifi-1.9.2nifi-1.9.2-bin.tar.gz
[ root@localhost download]# cd jars/
[ root@localhost jars]# ls
mysql-connector-java-5.1.40.jar[ root@localhost jars]# ls
mysql-connector-java-5.1.40.jarnifi-hive-nar-1.9.2.nar
有的以后要把它替换到 nifi 服务下的lib目录,并且重启 nifi 集群。首先,先停止 nifi 集群
[ root@localhost soft]# cat stop .
shexport/soft/nifi-1.9.2-18001/bin/nifi.sh
stopfexport/soft/nifi-1.9.2-18002/bin/nifi.sh
stop/export/soft/nifi-1.9.2-18003/bin/nifi.sh stop
[ root@localhost soft]#
[ root@localhost soft]# ./stop.sh
Java home: /export/soft/jdk1.8.0_221NiFi home:
/export/soft/nifi-1.9.2-18001
Bootstrap Config File:
/export/soft/nifi-1.9.2-18001/conf/bootstrap.conf
然后进行转移,在停止的过程中,可以把新上传的 nar 包,复制到三个节点的 lib 进行覆盖。覆盖成功以后进行重新启动,没有进城以后,就代表 nifi 停止成功,然后复制 jar 包到集群中
[ root@localhost soft]# jps35777 Jps
[ root@localhost soft]#[ root@localhost soft]#
[ root@localhost soft]#
[ root@localhost soft]# cd /export/download/
jars/nifi-hive-nar-1.9.2.nar ./nifi-1.9.2-18001/lib/-bash: cd:
/export/download/jars/nifi-hive-nar-1.9.2.nar:不是目录
[ root(localhost soft]# cp
/export/download/jars/nifi-hive-nar-1.9.2.nar ./nifi-1.9.2-18001/libr/cp:是否覆盖
"./nifi-1.9.2-18001/lib/nifi-hive-nar-1.9.2.nar"? yo 。 aor nifi.1 o 2.18002门13t[ root@localhost soft]# cp
/export/download/jars/nifi-hive-nar-1.9.2.nar ./nifi-1.9.2-18002/ib/cp:是否覆盖"./nifi-1.9.2-18002/lib/nifi-hive-nar-1.9.2.nar"? y
[ root(localhost soft]# cp /export/download/ jars/nifi-hive-nar-1.9.2.nar ./nifi-1.9.2-18003/lib/cp:是否覆盖"./nifi-1.9.2-18003/lib/nifi-hive-nar-1.9.2.nar"
提示是否覆盖输入 yes,分别复制到三个节点,复制成功以后,进行 start 重新启动,启动的时间是比较长的。这时候可以看到 CPU 利用率已经降低了,证明 nifi 集群启动完毕。通过 GPS 进行确认
[root@localhost soft]#jps
36272 RunNiFi
36292 NiFi
38228 jps
36197 RunNiFi
36218 NiFi 3
6346 RunNiFi
36366 NiFi
可以看到进程都已经存在。重新刷新 nifi 页面,进入以后重新进行运行,来查看是否能够成功写入 nifi 中。CPU 稳定以后来启动集群,启动以后发现没有数据,查询一下是否有多余数据,输入代码SELECT * from myhive.nifi_hive;发现确实多了一条数据
刷新一下 MYSQL 发现数据都是一致的。再插入三条数据,mysql 里面出现新增数据
再查看 hive,hive 相对来说处理比较慢,如果想提高并发度,着重还是在于优化 hive,执行完以后进行查询,可以看到 hive 中的数据和 MySQL 里面的数据是一致的。
通过这样的方式就成功实现了 mysql 数据同步到 hive 数据库。



























