(6)FlinkSQL将kafka数据写入到mysql方式一

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: (6)FlinkSQL将kafka数据写入到mysql方式一

image.png
这里不展开zookeeper、kafka安装配置
(1)首先需要启动zookeeper和kafka
image.png
(2)定义一个kafka生产者

package com.producers;

import com.alibaba.fastjson.JSONObject;
import com.pojo.Event;
import com.pojo.WaterSensor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Random;

/**
 * Created by lj on 2022-07-09.
 */
public class Kafaka_Producer {
    public final static String bootstrapServers = "127.0.0.1:9092";

    public static void main(String[] args) {
        Properties props = new Properties();
        //设置Kafka服务器地址
        props.put("bootstrap.servers", bootstrapServers);
        //设置数据key的序列化处理类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //设置数据value的序列化处理类
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        try {
            int i = 0;
            Random r=new Random();   //不传入种子
            String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};

            while(true) {
                Thread.sleep(2000);
                WaterSensor waterSensor = new WaterSensor(lang[r.nextInt(lang.length)],i,i);
                i++;

                String msg = JSONObject.toJSONString(waterSensor);
                System.out.println(msg);
                RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("kafka_data_waterSensor", null, null,  msg)).get();
//                System.out.println("recordMetadata: {"+ recordMetadata +"}");
            }

        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

(3)定义一个消息对象


package com.pojo;

import java.io.Serializable;

/**
 * Created by lj on 2022-07-05.
 */
public class WaterSensor implements Serializable {
    private String id;
    private long ts;
    private int vc;

    public WaterSensor(){

    }

    public WaterSensor(String id,long ts,int vc){
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    public int getVc() {
        return vc;
    }

    public void setVc(int vc) {
        this.vc = vc;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public long getTs() {
        return ts;
    }

    public void setTs(long ts) {
        this.ts = ts;
    }
}

(4)从kafka接入数据,并写入到mysql


public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //读取kafka的数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        DataStreamSource<String> streamSource = env.addSource(
                new FlinkKafkaConsumer<String>(
                        "kafka_waterSensor",
                        new SimpleStringSchema(),
                        properties)
        );

        SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                JSONObject json  = (JSONObject)JSONObject.parse(s);
                return new WaterSensor(json.getString("id"),json.getLong("ts"),json.getInteger("vc"));
            }
        });

        // 将流转化为表
        Table table = tableEnv.fromDataStream(waterDS,
                $("id"),
                $("ts"),
                $("vc"),
                $("pt").proctime());

        tableEnv.createTemporaryView("EventTable", table);


        tableEnv.executeSql("CREATE TABLE flinksink (" +
                "componentname STRING," +
                "componentcount BIGINT NOT NULL," +
                "componentsum BIGINT" +
                ") WITH (" +
                "'connector.type' = 'jdbc'," +
                "'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
                "'connector.table' = 'flinksink'," +
                "'connector.driver' =  'com.mysql.cj.jdbc.Driver'," +
                "'connector.username' = 'root'," +
                "'connector.password' = 'root'," +
                "'connector.write.flush.max-rows'='3'\r\n" +
                ")"
        );
        Table mysql_user = tableEnv.from("flinksink");
        mysql_user.printSchema();

        Table result = tableEnv.sqlQuery(
                "SELECT " +
                        "id as componentname, " +                //window_start, window_end,
                        "COUNT(ts) as componentcount ,SUM(ts) as componentsum " +
                        "FROM TABLE( " +
                        "TUMBLE( TABLE EventTable , " +
                        "DESCRIPTOR(pt), " +
                        "INTERVAL '10' SECOND)) " +
                        "GROUP BY id , window_start, window_end"
        );

        //方式一:写入数据库
//        result.executeInsert("flinksink").print(); //;.insertInto("flinksink");

        //方式二:写入数据库
        tableEnv.createTemporaryView("ResultTable", result);
        tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();

//        tableEnv.toAppendStream(result, Row.class).print("toAppendStream");           //追加模式
        env.execute();

    }

(5)效果演示
image.png

image.png

相关文章
|
2月前
|
安全 关系型数据库 MySQL
如何将数据从MySQL同步到其他系统
【10月更文挑战第17天】如何将数据从MySQL同步到其他系统
218 0
|
2月前
|
SQL 前端开发 关系型数据库
全表数据核对 ,行数据核对,列数据核对,Mysql 8.0 实例(sample database classicmodels _No.3 )
全表数据核对 ,行数据核对,列数据核对,Mysql 8.0 实例(sample database classicmodels _No.3 )
53 0
全表数据核对 ,行数据核对,列数据核对,Mysql 8.0 实例(sample database classicmodels _No.3 )
|
2月前
|
关系型数据库 MySQL 数据库
mysql 里创建表并插入数据
【10月更文挑战第5天】
137 1
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
52 3
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
175 0
|
28天前
|
存储 Oracle 关系型数据库
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
本文介绍了MySQL InnoDB存储引擎中的数据文件和重做日志文件。数据文件包括`.ibd`和`ibdata`文件,用于存放InnoDB数据和索引。重做日志文件(redo log)确保数据的可靠性和事务的持久性,其大小和路径可由相关参数配置。文章还提供了视频讲解和示例代码。
132 11
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
|
10天前
|
SQL 关系型数据库 MySQL
mysql分页读取数据重复问题
在服务端开发中,与MySQL数据库进行数据交互时,常因数据量大、网络延迟等因素需分页读取数据。文章介绍了使用`limit`和`offset`参数实现分页的方法,并针对分页过程中可能出现的数据重复问题进行了详细分析,提出了利用时间戳或确保排序规则绝对性等解决方案。
|
16天前
|
关系型数据库 MySQL 数据库
GBase 数据库如何像MYSQL一样存放多行数据
GBase 数据库如何像MYSQL一样存放多行数据
|
28天前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
58 14
|
1月前
|
SQL 前端开发 关系型数据库
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
54 9