Flink on zeppelin 结合kafka实时计算pv uv写入mysql

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: 上一篇文章主要介绍了Flink on zeppelin的安装和使用,配置了yarn的模式跑通了一个streaming wordcount的例子,本文主要介绍结合kafka的使用,实时计算一个简单的pv,uv把结果写入到mysql的例子.添加依赖包首先需要添加kafka以及mysql的jar包,有两种方式,第一种是直接把jar包添加到Flink的lib下面,如下所示:

上一篇文章主要介绍了Flink on zeppelin的安装和使用,配置了yarn的模式跑通了一个streaming wordcount的例子,本文主要介绍结合kafka的使用,实时计算一个简单的pv,uv把结果写入到mysql的例子.


添加依赖包


首先需要添加kafka以及mysql的jar包,有两种方式,第一种是直接把jar包添加到Flink的lib下面,如下所示:




只需要添加 flink-sql-connector-kafka_2.11-1.11.0.jar , flink-json-1.11.0.jar , flink-jdbc_2.11-1.10.1.jar , mysql-connector-java-5.1.47.jar 这4个jar包就可以了,我加的比较多是别的地方用到了,用不到的可以不用加防止出现jar包冲突的问题.


第二种是在zeppelin的UI上运行添加依赖包的命令,添加的格式如下所示,


flink.execution.packages  groupId:artifactId:version 然后点击运行就可以了,执行完后需要重启一下 interpreter.


%flink.conf
flink.execution.packages org.apache.flink:flink-jdbc_2.11:1.10.1


flink.execution.packages  这个配置也类似flink.execution.jars,但它不是用来指定jar包,而是用来指定package的。Zeppelin会下载这个package以及这个package的依赖,并且放到flink interpreter的classpath上。如果需要添加多个依赖的话,中间用逗号隔开就可以了.


创建表


先来创建一个kafka的流表,SQL语句如下所示.


%flink.ssql
DROP TABLE IF EXISTS kafka_table;
CREATE TABLE kafka_table (
    name VARCHAR COMMENT '姓名',
    age int COMMENT '年龄',
  city VARCHAR,
    borth VARCHAR,
    ts BIGINT  COMMENT '时间戳',
    t as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
    proctime as PROCTIME(),
    WATERMARK FOR t AS t - INTERVAL '5' SECOND
)
WITH (
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'jason_flink',  -- kafka topic
    'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
    'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',  -- broker连接信息
    'properties.group.id' = 'jason_flink_test',
    'scan.startup.mode' = 'latest-offset',  -- 读取数据的位置
    'format' = 'json'  -- 数据源格式为 json
)


这里使用的是Flink1.11.0的版本,所以Connector 的参数个数已经变了,虽然现在也兼容老的写法,不过还是建议使用新版本的写法,这样更加的简洁,然后可以先执行一下查询kafka表的SQL看一下是否可以获取到数据.



数据正常的打印出来了,说明是可以接收到数据的.然后继续创建一个mysql的结果表.


%flink.ssql
drop table if EXISTS  a;
CREATE TABLE a (
  name STRING,
  pv INT not null,
  uv INT not null,
  t_start TIMESTAMP(3),
  t_end TIMESTAMP(3),
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/test',
   'table-name' = 'a',
   'username' = 'mysql',
   'password' = '12345678'
)


这里先执行一下show tables也可以看到刚才创建的2个表



执行SQL


然后就可以做一个简单的基于滚动窗口的pv,uv的统计了,SQL语句非常的简单,这里要注意的是query的字段类型要和sink的字段类型保持一致,否则会报字段类型不匹配的错.


%flink.ssql(type=update,parallelism=4)
insert into a 
select name, 
 cast(count(name) as INT) as pv,
 cast(count(distinct name) as INT) as uv,
 TUMBLE_START(t, INTERVAL '5' second) as t_start,
 TUMBLE_END(t, INTERVAL '5' second) as t_end
 from kafka_table 
 group by name,TUMBLE(t, INTERVAL '5' second);


点击右上角的Flink Job,就可以调到Flink的UI页面看到Job运行的情况了.


网络异常,图片无法展示
|


从上面的records received和records send能看到数据进来了.最后再来看一下mysql里面是否有数据.


网络异常,图片无法展示
|


后面会介绍使用Flink on zeppelin实现更多的场景,大家可以持续关注一下.

相关文章
|
1月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
26天前
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
|
22天前
|
SQL 存储 关系型数据库
实时计算 Flink版产品使用问题之同步MySQL多张表的过程中,内存释放依赖于什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之使用CTAS同步MySQL到Hologres时出现的时区差异,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之连接到MySQL的从库时遇到其他服务也连接到了从库,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3天前
|
NoSQL 关系型数据库 MySQL
微服务架构下的数据库选择:MySQL、PostgreSQL 还是 NoSQL?
在微服务架构中,数据库的选择至关重要。不同类型的数据库适用于不同的需求和场景。在本文章中,我们将深入探讨传统的关系型数据库(如 MySQL 和 PostgreSQL)与现代 NoSQL 数据库的优劣势,并分析在微服务架构下的最佳实践。
|
5天前
|
存储 SQL 关系型数据库
使用MySQL Workbench进行数据库备份
【9月更文挑战第13天】以下是使用MySQL Workbench进行数据库备份的步骤:启动软件后,通过“Database”菜单中的“管理连接”选项配置并选择要备份的数据库。随后,选择“数据导出”,确认导出的数据库及格式(推荐SQL格式),设置存储路径,点击“开始导出”。完成后,可在指定路径找到备份文件,建议定期备份并存储于安全位置。
65 11
|
30天前
|
SQL 关系型数据库 MySQL
【揭秘】MySQL binlog日志与GTID:如何让数据库备份恢复变得轻松简单?
【8月更文挑战第22天】MySQL的binlog日志记录数据变更,用于恢复、复制和点恢复;GTID为每笔事务分配唯一ID,简化复制和恢复流程。开启binlog和GTID后,可通过`mysqldump`进行逻辑备份,包含binlog位置信息,或用`xtrabackup`做物理备份。恢复时,使用`mysql`命令执行备份文件,或通过`innobackupex`恢复物理备份。GTID模式下的主从复制配置更简便。
124 2
|
25天前
|
弹性计算 关系型数据库 数据库
手把手带你从自建 MySQL 迁移到云数据库,一步就能脱胎换骨
阿里云瑶池数据库来开课啦!自建数据库迁移至云数据库 RDS原来只要一步操作就能搞定!点击阅读原文完成实验就可获得一本日历哦~
|
28天前
|
关系型数据库 MySQL 数据库
RDS MySQL灾备服务协同解决方案构建问题之数据库备份数据的云上云下迁移如何解决
RDS MySQL灾备服务协同解决方案构建问题之数据库备份数据的云上云下迁移如何解决