Flink SQL与HBase的集成

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

版本说明:


flink-1.12.1

hbase-1.4.13

目录

(1)Flink SQL与HBase的集成配置

(2)测试Flink SQL与HBase集成代码

(3)测试kafka数据源与HBase写入数据


(1)Flink SQL与HBase的集成配置


第一步:Maven dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hbase-base_2.11</artifactId>
  <version>1.12.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-sql-connector-hbase-1.4_2.11</artifactId>
  <version>1.12.1</version>
</dependency>

第二步:将flink-sql-connector-hbase-1.4_2.11-1.12.1.jar这个包复制到flink的lib目录下


flink-sql-connector-hbase-1.4_2.11-1.12.1.jar这个包在maven仓库下载:


https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hbase-1.4_2.11/1.12.1


(2)测试Flink SQL与HBase集成代码


在测试代码之前需要将启动hbase服务,并创建写入hbase的数据表

create 'user_product_hbase','cf'
package com.aikfk.flink.sql.hbase;
import com.aikfk.flink.sql.CommonSQL;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class FlinkKafkaHBase {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        env.enableCheckpointing(5000);
        HiveCatalog hiveCatalog =
                new HiveCatalog(
                        CommonSQL.hiveCatalog_name,
                        CommonSQL.hiveDatabase_name,
                        CommonSQL.hiveConfDir,
                        CommonSQL.version
                        );
        tableEnvironment.registerCatalog(CommonSQL.hiveCatalog_name,hiveCatalog);
        tableEnvironment.useCatalog(CommonSQL.hiveCatalog_name);
        String user_product_kafka_create =
                "CREATE TABLE user_product_kafka (\n" +
                        " row_key STRING," +
                        " user_id STRING," +
                        " product_id STRING," +
                        " click_count INT " +
                        ") WITH (" +
                        " 'connector' = 'kafka'," +
                        " 'topic' = 'kfk'," +
                        " 'properties.bootstrap.servers' = 'bigdata-pro-m07:9092'," +
                        " 'properties.group.id' = 'test1'," +
                        " 'format' = 'json'," +
                        " 'scan.startup.mode' = 'latest-offset'" +
                        ")";
        tableEnvironment.executeSql("DROP TABLE IF EXISTS user_product_kafka");
        tableEnvironment.executeSql(user_product_kafka_create);
        tableEnvironment.executeSql("DROP TABLE IF EXISTS user_product_hbase");
        String user_product_hbase_create =
                        "CREATE TABLE user_product_hbase (\n" +
                        " row_key STRING,\n" +
                        " cf ROW<user_id STRING,product_id STRING,click_count INT>,\n" +
                        " PRIMARY KEY (row_key) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        " 'connector' = 'hbase-1.4',\n" +
                        " 'table-name' = 'user_product_hbase',\n" +
                        " 'zookeeper.quorum' = 'bigdata-pro-m07:2181'\n" +
                        ")";
        tableEnvironment.executeSql(user_product_hbase_create);
        tableEnvironment.executeSql(
                "INSERT INTO user_product_hbase\n" +
                "SELECT row_key, ROW(user_id, product_id, click_count) FROM user_product_kafka").print();
        env.execute();
    }
}


(3)测试kafka数据源与HBase写入数据


生产者生产数据:

package com.aikfk.flink.base;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerUtilSimple extends Thread {
        private String topic = "kfk";
        public KafkaProducerUtilSimple() {
            super();
        }
        private Producer<String, String> createProducer() {
            // 通过Properties类设置Producer的属性
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "bigdata-pro-m07:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            return new KafkaProducer<String, String>(properties);
        }
        @Override
        public void run() {
            Producer<String, String> producer = createProducer();
            Random random = new Random();
            Random random2 = new Random();
            while (true) {
                String user_id = "user_"+random.nextInt(10);
                String product_id = "product_"+random2.nextInt(100);
                System.out.println(user_id + " :" + product_id);
                String time = System.currentTimeMillis() / 1000 + 5 + "";
                String row_key = user_id+"_"+product_id+"_"+time;
                try {
//
                    String kaifa_log = "{" +
                            "\"row_key\":\"" + row_key+"\"," +
                            "\"user_id\":\"" + user_id+"\"," +
                            "\"product_id\":\"" + product_id+"\"," +
                            "\"click_count\":\"1\"}";
                    System.out.println("kaifa_log = " + kaifa_log);
                    producer.send(new ProducerRecord<String, String>(this.topic, kaifa_log));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("=========循环一次==========");
                try {
                    sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String[] args) {
            new KafkaProducerUtilSimple().run();
        }
    }

生产结果:

user_8 :product_48
kaifa_log = {"row_key":"user_8_product_48_1618237864","user_id":"user_8","product_id":"product_48","click_count":"1"}
=========循环一次==========
user_6 :product_53
kaifa_log = {"row_key":"user_6_product_53_1618237867","user_id":"user_6","product_id":"product_53","click_count":"1"}
=========循环一次==========
user_8 :product_34
kaifa_log = {"row_key":"user_8_product_34_1618237870","user_id":"user_8","product_id":"product_34","click_count":"1"}
=========循环一次==========

通过flink SQL client查看数据

5.png

查看hbase写入数据情况:

4.png




相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
105 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
184 15
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
62 1
|
3月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
649 2
Flink CDC:新一代实时数据集成框架
|
2月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
49 0
|
3月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
87 2
|
3月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
48 1
|
4月前
|
SQL 流计算
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
51 1
|
4月前
|
SQL 安全 流计算
Flink SQL 在快手实践问题之Group Window Aggregate 中的数据倾斜问题如何解决
Flink SQL 在快手实践问题之Group Window Aggregate 中的数据倾斜问题如何解决
84 1