Flink on zeppelin 结合kafka实时计算pv uv写入mysql

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: 上一篇文章主要介绍了Flink on zeppelin的安装和使用,配置了yarn的模式跑通了一个streaming wordcount的例子,本文主要介绍结合kafka的使用,实时计算一个简单的pv,uv把结果写入到mysql的例子.添加依赖包首先需要添加kafka以及mysql的jar包,有两种方式,第一种是直接把jar包添加到Flink的lib下面,如下所示:

上一篇文章主要介绍了Flink on zeppelin的安装和使用,配置了yarn的模式跑通了一个streaming wordcount的例子,本文主要介绍结合kafka的使用,实时计算一个简单的pv,uv把结果写入到mysql的例子.


添加依赖包


首先需要添加kafka以及mysql的jar包,有两种方式,第一种是直接把jar包添加到Flink的lib下面,如下所示:




只需要添加 flink-sql-connector-kafka_2.11-1.11.0.jar , flink-json-1.11.0.jar , flink-jdbc_2.11-1.10.1.jar , mysql-connector-java-5.1.47.jar 这4个jar包就可以了,我加的比较多是别的地方用到了,用不到的可以不用加防止出现jar包冲突的问题.


第二种是在zeppelin的UI上运行添加依赖包的命令,添加的格式如下所示,


flink.execution.packages  groupId:artifactId:version 然后点击运行就可以了,执行完后需要重启一下 interpreter.


%flink.conf
flink.execution.packages org.apache.flink:flink-jdbc_2.11:1.10.1


flink.execution.packages  这个配置也类似flink.execution.jars,但它不是用来指定jar包,而是用来指定package的。Zeppelin会下载这个package以及这个package的依赖,并且放到flink interpreter的classpath上。如果需要添加多个依赖的话,中间用逗号隔开就可以了.


创建表


先来创建一个kafka的流表,SQL语句如下所示.


%flink.ssql
DROP TABLE IF EXISTS kafka_table;
CREATE TABLE kafka_table (
    name VARCHAR COMMENT '姓名',
    age int COMMENT '年龄',
  city VARCHAR,
    borth VARCHAR,
    ts BIGINT  COMMENT '时间戳',
    t as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
    proctime as PROCTIME(),
    WATERMARK FOR t AS t - INTERVAL '5' SECOND
)
WITH (
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'jason_flink',  -- kafka topic
    'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
    'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',  -- broker连接信息
    'properties.group.id' = 'jason_flink_test',
    'scan.startup.mode' = 'latest-offset',  -- 读取数据的位置
    'format' = 'json'  -- 数据源格式为 json
)


这里使用的是Flink1.11.0的版本,所以Connector 的参数个数已经变了,虽然现在也兼容老的写法,不过还是建议使用新版本的写法,这样更加的简洁,然后可以先执行一下查询kafka表的SQL看一下是否可以获取到数据.



数据正常的打印出来了,说明是可以接收到数据的.然后继续创建一个mysql的结果表.


%flink.ssql
drop table if EXISTS  a;
CREATE TABLE a (
  name STRING,
  pv INT not null,
  uv INT not null,
  t_start TIMESTAMP(3),
  t_end TIMESTAMP(3),
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/test',
   'table-name' = 'a',
   'username' = 'mysql',
   'password' = '12345678'
)


这里先执行一下show tables也可以看到刚才创建的2个表



执行SQL


然后就可以做一个简单的基于滚动窗口的pv,uv的统计了,SQL语句非常的简单,这里要注意的是query的字段类型要和sink的字段类型保持一致,否则会报字段类型不匹配的错.


%flink.ssql(type=update,parallelism=4)
insert into a 
select name, 
 cast(count(name) as INT) as pv,
 cast(count(distinct name) as INT) as uv,
 TUMBLE_START(t, INTERVAL '5' second) as t_start,
 TUMBLE_END(t, INTERVAL '5' second) as t_end
 from kafka_table 
 group by name,TUMBLE(t, INTERVAL '5' second);


点击右上角的Flink Job,就可以调到Flink的UI页面看到Job运行的情况了.


网络异常,图片无法展示
|


从上面的records received和records send能看到数据进来了.最后再来看一下mysql里面是否有数据.


网络异常,图片无法展示
|


后面会介绍使用Flink on zeppelin实现更多的场景,大家可以持续关注一下.

相关文章
|
4月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
320 0
|
2月前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
148 16
|
6月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之使用CTAS同步MySQL到Hologres时出现的时区差异,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 存储 关系型数据库
实时计算 Flink版产品使用问题之同步MySQL多张表的过程中,内存释放依赖于什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4天前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
4月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
173 1
|
4月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
76 1
|
6月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
408 9
|
6月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
94 3
|
6月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
230 0

热门文章

最新文章