Flink cdc到doris,starrocks,table store

简介: Flink cdc到doris,starrocks,table store


flink cdc到doris


flink cdc同步到doris主类:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
/***
 *
 * Synchronize the full database through flink cdc
 *
 */
public class DatabaseFullSync {
    private static final Logger LOG = LoggerFactory.getLogger(DatabaseFullSync.class);
    private static String HOST = "127.0.0.1";
    private static String MYSQL_PASSWD = "password";
    private static int MYSQL_PORT = 3306;
    private static int DORIS_PORT = 8030;
    private static String MYSQL_USER = "root";
    private static String SYNC_DB = "test";
    private static String SYNC_TBLS = "test.*";
    private static String TARGET_DORIS_DB = "test";
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname(HOST)
            .port(MYSQL_PORT)
            .databaseList(SYNC_DB) // set captured database
            .tableList(SYNC_TBLS) // set captured table
            .username(MYSQL_USER)
            .password(MYSQL_PASSWD)
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // enable checkpoint
        env.enableCheckpointing(10000);
        DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
        //get table list
        List<String> tableList = getTableList();
        LOG.info("sync table list:{}",tableList);
        for(String tbl : tableList){
            SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl);
            SingleOutputStreamOperator<String> cleanStream = clean(filterStream);
            DorisSink dorisSink = buildDorisSink(tbl);
            cleanStream.sinkTo(dorisSink).name("sink " + tbl);
        }
        env.execute("Full Database Sync ");
    }
    /**
     * Get real data
     * {
     *     "before":null,
     *     "after":{
     *         "id":1,
     *         "name":"zhangsan-1",
     *         "age":18
     *     },
     *     "source":{
     *         "db":"test",
     *         "table":"test_1",
     *         ...
     *     },
     *     "op":"c",
     *     ...
     * }
     * */
    private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {
        return source.flatMap(new FlatMapFunction<String,String>(){
            @Override
            public void flatMap(String row, Collector<String> out) throws Exception {
                try{
                    JSONObject rowJson = JSON.parseObject(row);
                    String op = rowJson.getString("op");
                    //history,insert,update
                    if(Arrays.asList("r","c","u").contains(op)){
                        out.collect(rowJson.getJSONObject("after").toJSONString());
                    }else{
                        LOG.info("filter other op:{}",op);
                    }
                }catch (Exception ex){
                    LOG.warn("filter other format binlog:{}",row);
                }
            }
        });
    }
    /**
     * Divide according to tablename
     * */
    private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {
        return source.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String row) throws Exception {
                try {
                    JSONObject rowJson = JSON.parseObject(row);
                    JSONObject source = rowJson.getJSONObject("source");
                    String tbl = source.getString("table");
                    return table.equals(tbl);
                }catch (Exception ex){
                    ex.printStackTrace();
                    return false;
                }
            }
        });
    }
    /**
     * Get all MySQL tables that need to be synchronized
     * */
    private static List<String> getTableList() {
        List<String> tables = new ArrayList<>();
        String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";
        List<JSONObject> tableList = JdbcUtil.executeQuery(HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);
        for(JSONObject jsob : tableList){
            String schemaName = jsob.getString("TABLE_SCHEMA");
            String tblName = jsob.getString("TABLE_NAME");
            String schemaTbl = schemaName  + "." + tblName;
            if(schemaTbl.matches(SYNC_TBLS)){
                tables.add(tblName);
            }
        }
        return tables;
    }
    /**
     * create doris sink
     * */
    public static DorisSink buildDorisSink(String table){
        DorisSink.Builder<String> builder = DorisSink.builder();
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder.setFenodes(HOST + ":" + DORIS_PORT)
            .setTableIdentifier(TARGET_DORIS_DB + "." + table)
            .setUsername("root")
            .setPassword("");
        Properties pro = new Properties();
        //json data format
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");
        DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
            .setLabelPrefix("label-" + table + UUID.randomUUID()) //streamload label prefix,
            .setStreamLoadProp(pro).build();
        builder.setDorisReadOptions(DorisReadOptions.builder().build())
            .setDorisExecutionOptions(executionOptions)
            .setSerializer(new SimpleStringSerializer()) //serialize according to string
            .setDorisOptions(dorisBuilder.build());
        return builder.build();
    }
}

JdbcUtil类:

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class JdbcUtil {
    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
    private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);
    public static void main(String[] args) throws SQLException {
    }
    public static List<JSONObject> executeQuery(String hostUrl, int port, String user, String password, String sql){
        List<JSONObject> beJson = new ArrayList<>();
        String connectionUrl = String.format("jdbc:mysql://%s:%s/",hostUrl,port);
        Connection con = null;
        try {
            con = DriverManager.getConnection(connectionUrl,user,password);
            PreparedStatement ps = con.prepareStatement(sql);
            ResultSet rs = ps.executeQuery();
            beJson = resultSetToJson(rs);
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                con.close();
            } catch (Exception e) {
            }
        }
        return beJson;
    }
    private static List<JSONObject> resultSetToJson(ResultSet rs) throws SQLException {
        List<JSONObject> list = new ArrayList<>();
        ResultSetMetaData metaData = rs.getMetaData();
        int columnCount = metaData.getColumnCount();
        while (rs.next()) {
            JSONObject jsonObj = new JSONObject();
            for (int i = 1; i <= columnCount; i++) {
                String columnName =metaData.getColumnLabel(i);
                String value = rs.getString(columnName);
                jsonObj.put(columnName, value);
            }
            list.add(jsonObj);
        }
        return list;
    }
}


flink cdc到starrocks


主要实现的流程:

  1. Flink cdc 采集 mysql 数据
  2. 将 cdc 采集到的数据转为 json
  3. 从 json 中获取 数据库、表和数据
  4. 用数据库和表对数据做 key by
  5. 使用 process function 处理每个表的数据,用状态缓存数据,缓存数据达到一定量或者缓存了一定时间(用 timer 触发缓存时间触发的场景)StarRocks 写数据
  6. sink 中拼接数据 使用 Stream Load 往 StarRocks 写数据


主类 CdcToStarRocks

       主要流程很简单: source -> map -> keyBy -> process -> sink,source 读取 mysql binlog(或者全量+增量),map 转换数据格式,keyBy 以数据库名 + 表名对数据分区,process 中对数据攒批,sink 输出数据到 StarRocks

import com.venn.source.mysql.cdc.CommonStringDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/**
 * mysql cdc demo
 * <p>
 * cdc 整库同步数据到 starrocks
 * <p>
 * 局限:
 * 1. 还未实现 starrocks 端表结构跟随 源端表结构同步变更
 * 2. 为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更
 * 3. 部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据
 */
public class CdcToStarRocks {
    // 每个批次最大条数和等待时间
    private static int batchSize = 10000;
    private static long batchInterval = 10 *1000;
    public static void main(String[] args) throws Exception {
        String ip = "localhost";
        int port = 3306;
        String db = "venn";
//        String table = "venn.user_log";
        String table = "venn.*";
        String user = "root";
        String pass = "123456";
        String starrocksIp = "10.201.0.230";
        String starrocksPort = "29030";
        String starrocksLoadPort = "28030";
        String starrocksUser = "root";
        String starrocksPass = "123456";
        String starrocksDb = "test";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        MySqlSource<String> sourceFunction = MySqlSource.<String>builder()
                .hostname(ip)
                .port(port)
                // 获取两个数据库的所有表
                .databaseList(db)
                .tableList(table)
                .username(user)
                .password(pass)
                .startupOptions(StartupOptions.latest())
                // do not cache schema change
//                .includeSchemaChanges(false)
//                .startupOptions(StartupOptions.initial())
                // 自定义 解析器,讲数据解析成 json
                .deserializer(new CommonStringDebeziumDeserializationSchema(ip, port))
                .build();
        env
                .fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "cdc")
                .name("source")
                .uid("source")
//                 json 字符串转 CdcRecord
                .map(new CdcStarMapFunction())
                .name("map")
                .keyBy(  record -> record.getDb() + "_" + record.getTable())
                .process(new CdcStarProcessFunction(batchSize, batchInterval))
                .name("process")
                .uid("process")
                .addSink(new StarRocksSink(starrocksIp, starrocksPort, starrocksLoadPort, starrocksUser, starrocksPass, starrocksDb))
                .name("sink");
        env.execute("cdcToStarRocks");
    }
}

反序列化器 CommonStringDebeziumDeserializationSchema

反序列化器直接拿之前写的通用的 flink cdc 反序列化器过来,继承 DebeziumDeserializationSchema,主要是从数据中获取 数据库、表、操作类型和数据,需求特别注意以下几点:

  1. insert 类型的操心,只需要获取 after 中的数据
  2. update 类型的操作,需要同时解析 before 和 after 的数据,before 是修改前的,after 是修改后的,如果不需要修改前的,可以只获取 after
  3. delete 类型的操作,需要获取 before
  4. 如果有 ddl 操作,需要特殊处理(本次不包含)
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.google.gson.JsonObject;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/**
 * deserialize debezium format binlog
 */
public class CommonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
    private String host;
    private int port;
    public CommonStringDebeziumDeserializationSchema(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public void deserialize(SourceRecord record, Collector<String> out) {
        JsonObject jsonObject = new JsonObject();
        String binlog = record.sourceOffset().get("file").toString();
        String offset = record.sourceOffset().get("pos").toString();
        String ts_sec = record.sourceOffset().get("ts_sec").toString();
//        System.out.println("binlog : " + binlog + ", offset = " + offset);
        // todo get schame change
        jsonObject.addProperty("host", host);
        // add meta
        jsonObject.addProperty("binlog", binlog);
        jsonObject.addProperty("offset", offset);
        jsonObject.addProperty("ts_sec", ts_sec);
        jsonObject.addProperty("port", port);
        jsonObject.addProperty("file", (String) record.sourceOffset().get("file"));
        jsonObject.addProperty("pos", (Long) record.sourceOffset().get("pos"));
        jsonObject.addProperty("ts_sec", (Long) record.sourceOffset().get("ts_sec"));
        String[] name = record.valueSchema().name().split("\\.");
        jsonObject.addProperty("db", name[1]);
        jsonObject.addProperty("table", name[2]);
        Struct value = ((Struct) record.value());
        String operatorType = value.getString("op");
        jsonObject.addProperty("operator_type", operatorType);
        // c : create, u: update, d: delete, r: read
        // insert update
        if (!"d".equals(operatorType)) {
            Struct after = value.getStruct("after");
            JsonObject afterJsonObject = parseRecord(after);
            jsonObject.add("after", afterJsonObject);
        }
        // update & delete
        if ("u".equals(operatorType) || "d".equals(operatorType)) {
            Struct source = value.getStruct("before");
            JsonObject beforeJsonObject = parseRecord(source);
            jsonObject.add("before", beforeJsonObject);
        }
        jsonObject.addProperty("parse_time", System.currentTimeMillis() / 1000);
        out.collect(jsonObject.toString());
    }
    private JsonObject parseRecord(Struct after) {
        JsonObject jo = new JsonObject();
        for (Field field : after.schema().fields()) {
            switch ((field.schema()).type()) {
                case INT8:
                    int resultInt8 = after.getInt8(field.name());
                    jo.addProperty(field.name(), resultInt8);
                    break;
                case INT64:
                    Long resultInt = after.getInt64(field.name());
                    jo.addProperty(field.name(), resultInt);
                    break;
                case FLOAT32:
                    Float resultFloat32 = after.getFloat32(field.name());
                    jo.addProperty(field.name(), resultFloat32);
                    break;
                case FLOAT64:
                    Double resultFloat64 = after.getFloat64(field.name());
                    jo.addProperty(field.name(), resultFloat64);
                    break;
                case BYTES:
                    // json ignore byte column
                    // byte[] resultByte = after.getBytes(field.name());
                    // jo.addProperty(field.name(), String.valueOf(resultByte));
                    break;
                case STRING:
                    String resultStr = after.getString(field.name());
                    jo.addProperty(field.name(), resultStr);
                    break;
                default:
            }
        }
        return jo;
    }
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

转换函数 CdcStarMapFunction

CdcStarMapFunction 比较简单,将 json 字符串,转成 CdcRecord 类型的对象,只获取了需要的 数据库、表、操作类型和数据。

获取数据时,insert 和 update 只获取 after 的值

import java.util.LinkedHashMap;
import java.util.Map;
import lombok.Data;
/**
 * cdcRecord save
 */
 @Data
 public class CdcRecord {
    private String db;
    private String table;
    private String op;
    private Map<String, String> data = new LinkedHashMap<>();
}
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class CdcStarMapFunction extends RichMapFunction<String, CdcRecord> {
    private final static Logger LOG = LoggerFactory.getLogger(CdcStarMapFunction.class);
    private JsonParser parser;
    @Override
    public void open(Configuration parameters) throws Exception {
        parser = new JsonParser();
    }
    @Override
    public CdcRecord map(String element) throws Exception {
        LOG.debug("data : {}" , element );
        JsonObject object = parser.parse(element).getAsJsonObject();
        String db = object.get("db").getAsString();
        String table = object.get("table").getAsString();
        String op = object.get("operator_type").getAsString();
        CdcRecord record = new CdcRecord(db, table, op);
        // insert/update
        String dataLocation = "after";
        if("d".equals(op)){
            // if op is delete, get before
            dataLocation = "before";
        }
        JsonObject data = object.get(dataLocation).getAsJsonObject();
        for(Map.Entry<String, JsonElement> entry: data.entrySet()){
            String columnName = entry.getKey();
            String columnValue;
            JsonElement value = entry.getValue();
            if(!value.isJsonNull()){
                // if column value is not null, get as string
                columnValue = value.getAsString();
                // put column name/value to record.data
                record.getData().put(columnName, columnValue);
            }
        }
        return record;
    }
}

处理函数 CdcStarProcessFunction

CdcStarProcessFunction 有三部分逻辑:

  1. 三个状态cacheTimer、cacheSize、cache,分别存下一次timer 触发时间、缓存的数据条数、缓存的数据
  2. process 处理每个表的数据,每个表的数据第一次到的时候,基于当前时间 + batchInterval,注册下次时间触发的 timer。数据存储到 cache 中,如果数据量超过预定的 batchSize,触发 flushData 方法往下游输出数据,并删除之前注册的定时器,清理状态
  3. timer 触发 flushData 方法往下游输出数据,清理状态
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class CdcStarProcessFunction extends KeyedProcessFunction<String, CdcRecord, List<CdcRecord>> {
    private final static Logger LOG = LoggerFactory.getLogger(CdcStarProcessFunction.class);
    private int batchSize;
    private long batchInterval;
    // next timer time
    private ValueState<Long> cacheTimer;
    // current cache size
    private ValueState<Integer> cacheSize;
    // cache data
    private ListState<CdcRecord> cache;
    public CdcStarProcessFunction(int batchSize, long batchInterval) {
        this.batchSize = batchSize;
        this.batchInterval = batchInterval;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor cacheDescriptor = new ListStateDescriptor<CdcRecord>("cache", TypeInformation.of(CdcRecord.class));
        cache = getRuntimeContext().getListState(cacheDescriptor);
        ValueStateDescriptor cacheSizeDescriptor = new ValueStateDescriptor<Integer>("cacheSize", Integer.class);
        cacheSize = getRuntimeContext().getState(cacheSizeDescriptor);
        ValueStateDescriptor cacheTimerDescriptor = new ValueStateDescriptor<Long>("cacheTimer", Long.class);
        cacheTimer = getRuntimeContext().getState(cacheTimerDescriptor);
    }
    @Override
    public void processElement(CdcRecord element, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.Context ctx, Collector<List<CdcRecord>> out) throws Exception {
        // cache size + 1
        if (cacheSize.value() != null) {
            cacheSize.update(cacheSize.value() + 1);
        } else {
            cacheSize.update(1);
            // add timer for interval flush
            long nextTimer = System.currentTimeMillis() + batchInterval;
            LOG.debug("register timer : {} , key : {}", nextTimer, ctx.getCurrentKey());
            cacheTimer.update(nextTimer);
            ctx.timerService().registerProcessingTimeTimer(nextTimer);
        }
        // add data to cache state
        cache.add(element);
        // cache size max than batch Size
        if (cacheSize.value() >= batchSize) {
            // remove next timer
            long nextTimer = cacheTimer.value();
            LOG.debug("{} remove timer, key : {}", nextTimer, ctx.getCurrentKey());
            ctx.timerService().deleteProcessingTimeTimer(nextTimer);
            // flush data to down stream
            flushData(out);
        }
    }
    /**
     * flush data to down stream
     */
    private void flushData(Collector<List<CdcRecord>> out) throws Exception {
        List<CdcRecord> tmpCache = new ArrayList<>();
        Iterator<CdcRecord> it = cache.get().iterator();
        while (it.hasNext()) {
            tmpCache.add(it.next());
        }
        if (tmpCache.size() > 0) {
            out.collect(tmpCache);
            // finish flush all cache data, clear state
            cache.clear();
            cacheSize.clear();
            cacheTimer.clear();
        }
    }
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.OnTimerContext ctx, Collector<List<CdcRecord>> out) throws Exception {
        LOG.info("{} trigger timer to flush data", ctx.getCurrentKey(), timestamp);
        // batch interval trigger flush data
        flushData(out);
    }
    @Override
    public void close() throws Exception {
    }
}

输出函数 StarRocksSink

StarRocksSink 稍微复杂一点,需要基于数据中的表名,去目标数据库中获取对应的表结构(为了避免每次查询数据库,将获取到的表结构存到内存中),基于目标表的字段顺序从数据中获取对应列的值,拼接上数据的操作类型。

  • StarRocksSink 在往 StarRocks 写数据的时候,实现了 upsert 和 delete 操作,需要在数据中拼接上 0/1,0 代表 UPSERT 操作,1 代表 DELETE 操作
  • 见参考文档1

invoke 方法

StarRocksSink 的核心逻辑都在 invoke 方法中,逻辑如下:

  1. 从数据中获取数据库和表,拼接成 key
  2. 获取目标表的 schema(整库映射,源端和目标端表名一致),先从缓存中获取,如果不存在就从数据库中获取
  3. 组装数据
  4. 拼接 load url
  5. 用 http 方式往 StarRocks 写数据

loadTargetTableSchema 方法

执行 desc db.table 获取目标表的表结构,组装成两种结果:将所有列名用 "," 拼接成字符串,再拼接 "__op" 用于 http header 请求中标识数据的列;将所有列按顺序添加到 list 中,用于从源数据中获取对应列的数据

parseUploadData 方法

用目标表列顺序,从数据中获取对应列的值,使用列分隔符拼接数据,最后基于操作类型拼接 0/1,删除拼接 1,其他类型拼接 0

doHttp 方法

用 http 的方式往 StarRocks 中写数据,没什么特别的,忽略

import org.apache.commons.codec.binary.Base64;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.http.HttpException;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StarRocksSink extends RichSinkFunction<List<CdcRecord>> {
    private final static Logger LOG = LoggerFactory.getLogger(StarRocksSink.class);
    public final static String COL_SEP = "\\\\x01";
    public final static String ROW_SEP = "\\\\x02";
    public final static String NULL_COL = "\\N";
    private String ip;
    private String port;
    private String loadPort;
    private String user;
    private String pass;
    private String db;
    private Connection connection;
    private Map<String, String> spliceColumnMap = new HashMap<>();
    private Map<String, List<String>> columnMap = new HashMap<>();
    public StarRocksSink() {
    }
    public StarRocksSink(String ip, String port, String loadPort, String user, String pass, String db) {
        this.ip = ip;
        this.port = port;
        this.loadPort = loadPort;
        this.user = user;
        this.pass = pass;
        this.db = db;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        reConnect();
    }
    @Override
    public void invoke(List<CdcRecord> element, Context context) throws Exception {
        LOG.info("write batch size: " + element.size());
        if(element == null || element.size() ==0){
            LOG.info("ignore empty element");
            return;
        }
        // use StarRocks db name
//        String db = cache.get(0).getDb();
        String table = element.get(0).getTable();
        String key = db + "_" + table;
        // get table schema
        List<String> columnList;
        if (!columnMap.containsKey(key)) {
            // db.table is first coming, load column, put to spliceColumnMap & columnMap
            loadTargetTableSchema(key, db, table);
        }
        String columns = spliceColumnMap.get(key);
        columnList = columnMap.get(key);
        if (columnList.size() == 0) {
            LOG.info("{}.{} not exists in target starrocks, ingore data change", db, table);
        }
        // make up data
        String data = parseUploadData(element, columnList);
        final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load", ip, loadPort, db, table);
        String label = db + "_" + table + "_" + System.currentTimeMillis();
        // send data to starrocks
        doHttp(loadUrl, data, label, columns);
    }
    /**
     * http send data to starrocks
     */
    private void doHttp(String loadUrl, String data, String label, String columns) throws IOException, SQLException {
        final HttpClientBuilder httpClientBuilder = HttpClients
                .custom()
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                })
                .addInterceptorFirst(new ContentLengthHeaderRemover());
        try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(loadUrl);
            StringEntity entity = new StringEntity(data, "UTF-8");
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(user, pass));
            // the label header is optional, not necessary
            // use label header can ensure at most once semantics
            put.setHeader("label", label);
            put.setHeader("columns", columns);
            put.setHeader("row_delimiter", ROW_SEP);
            put.setHeader("column_separator", COL_SEP);
            put.setEntity(entity);
            try (CloseableHttpResponse response = client.execute(put)) {
                String loadResult = "";
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                }
                final int statusCode = response.getStatusLine().getStatusCode();
                // statusCode 200 just indicates that starrocks be service is ok, not stream load
                // you should see the output content to find whether stream load is success
                if (statusCode != 200) {
                    throw new IOException(
                            String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));
                }
            }
        }
    }
    private String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }
    private String parseUploadData(List<CdcRecord> cache, List<String> columnList) {
        StringBuilder builder = new StringBuilder();
        for (CdcRecord element : cache) {
            Map<String, String> data = element.getData();
            for (String column : columnList) {
                if (data.containsKey(column)) {
                    builder.append(data.get(column)).append(COL_SEP);
                } else {
                    // if target column not exists in source data, set as null
                    builder.append(NULL_COL).append(COL_SEP);
                }
            }
            // add __op
            if ("d".equals(element.getOp())) {
                // delete
                builder.append("1");
            } else {
                // upsert
                builder.append("0");
            }
            // add row separator
            builder.append(ROW_SEP);
        }
        // remove last row sep
        builder = builder.delete(builder.length() - 5, builder.length());
        String data = builder.toString();
        return data;
    }
    /**
     * load table schema, parse to http column and column list for load source data
     */
    private void loadTargetTableSchema(String key, String db, String table) throws SQLException {
        List<String> columnList = new ArrayList<>();
        StringBuilder builer = new StringBuilder();
        try {
            // load table schema
            PreparedStatement insertPS = connection.prepareStatement("desc " + db + "." + table);
            ResultSet result = insertPS.executeQuery();
            while (result.next()) {
                String column = result.getString(1);
                builer.append(column).append(",");
                columnList.add(column);
            }
        } catch (SQLException e) {
            LOG.warn("load {}.{} schema error. {}", db, table, e.getStackTrace());
        }
        builer.append("__op");
        String columns = builer.toString();
        spliceColumnMap.put(key, columns);
        columnMap.put(key, columnList);
    }
    /**
     * reconnect to starrocks
     *
     * @throws SQLException
     */
    private void reConnect() throws SQLException {
        String driver = "jdbc:mysql://" + ip + ":" + port;
        if (connection == null || connection.isClosed()) {
            connection = DriverManager.getConnection(driver, user, pass);
        }
    }
    @Override
    public void finish() throws Exception {
        LOG.info("finish");
    }
    @Override
    public void close() throws Exception {
        LOG.info("close...");
        connection.close();
    }
    private static class ContentLengthHeaderRemover implements HttpRequestInterceptor {
        @Override
        public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {
            // fighting org.apache.http.protocol.RequestContent's ProtocolException("Content-Length header already present");
            request.removeHeaders(HTTP.CONTENT_LEN);
        }
    }
}

局限性:

  1. 还未实现 starrocks 端表结构跟随 源端表结构同步变更
  2. 为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更
  3. 部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据


flink cdc到table store

package name.lijiaqi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class MysqlToHudiExample {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        // 数据源表
    String sourceDDL =
            "CREATE TEMPORARY TABLE ods_lineitem (\n" +
            " l_orderkey INT NOT NULL,\n" +
            " l_partkey INT NOT NULL,\n" +
            " l_suppkey INT NOT NULL,\n" +
            " l_linenumber INT NOT NULL,\n" +
            " l_quantity DECIMAL(15, 2) NOT NULL,\n" +
            " l_extendedprice DECIMAL(15, 2) NOT NULL,\n" +
            " l_discount DECIMAL(15, 2) NOT NULL,\n" +
            " l_tax DECIMAL(15, 2) NOT NULL,\n" +
            " l_returnflag CHAR(1) NOT NULL,\n" +
            " l_linestatus CHAR(1) NOT NULL,\n" +
            " l_shipdate DATE NOT NULL,\n" +
            " l_commitdate DATE NOT NULL,\n" +
            " l_receiptdate DATE NOT NULL,\n" +
            " l_shipinstruct CHAR(25) NOT NULL,\n" +
            " l_shipmode CHAR(10) NOT NULL,\n" +
            " l_comment VARCHAR(44) NOT NULL,\n" +
            " PRIMARY KEY (l_orderkey, l_linenumber) NOT ENFORCED \n" +
            ") WITH (\n" +
            "  'connector' = 'mysql-cdc',\n" +
            "  'hostname' = '127.0.0.1',\n" + -- 如果想使用 host,可以修改宿主机 /etc/hosts 加入 127.0.0.1 mysql.docker.internal
            "  'port' = '3307',\n" +
            "  'username' = 'flink',\n" +
            "  'password' = 'flink',\n" +
            "  'database-name' = 'tpch_s10',\n" +
            "  'table-name' = 'lineitem' \n" +
            ");"  
    String tableSourceDDL=
      "CREATE CATALOG `table_store` WITH (\n" +
      "'type' = 'table-store',\n" +
      "'warehouse' = '/tmp/table-store-101'\n" +
      ");\n" +
      "USE CATALOG `table_store`;"
        // 输出目标表
        String sinkDWDDDL =
            "CREATE TABLE IF NOT EXISTS dwd_lineitem (\n" +
        " l_orderkey INT NOT NULL,\n" +
        " l_partkey INT NOT NULL,\n" +
        " l_suppkey INT NOT NULL,\n" +
        " l_linenumber INT NOT NULL,\n" +
        " l_quantity DECIMAL(15, 2) NOT NULL,\n" +
        " l_extendedprice DECIMAL(15, 2) NOT NULL,\n" +
        " l_discount DECIMAL(15, 2) NOT NULL,\n" +
        " l_tax DECIMAL(15, 2) NOT NULL,\n" +
        " l_returnflag CHAR(1) NOT NULL,\n" +
        " l_linestatus CHAR(1) NOT NULL,\n" +
        " l_shipdate DATE NOT NULL,\n" +
        " l_commitdate DATE NOT NULL,\n" +
        " l_receiptdate DATE NOT NULL,\n" +
        " l_shipinstruct CHAR(25) NOT NULL,\n" +
        " l_shipmode CHAR(10) NOT NULL,\n" +
        " l_comment VARCHAR(44) NOT NULL,\n" +
        " l_year BIGINT NOT NULL,\n" +
        " l_month BIGINT NOT NULL,\n" +
        " PRIMARY KEY (l_orderkey, l_linenumber, l_year, l_month) NOT ENFORCED \n" +
        " ) PARTITIONED BY (l_year, l_month) WITH (
        " -- 每个 partition 下设置 2 个 bucket
        " 'bucket' = '2',\n" +
        " -- 设置 changelog-producer 为 'input',这会使得上游 CDC Source 不丢弃 update_before,并且下游消费 dwd_lineitem 时没有 changelog-normalize 节点
        " 'changelog-producer' = 'input' \n" +
        " );"
         // 输出目标表
        String sinkADSDDL =
      "CREATE  TABLE IF NOT EXISTS ads_pricing_summary (\n" +
      "l_returnflag CHAR(1) NOT  NULL,\n" +
      "l_linestatus CHAR(1) NOT  NULL,\n" +
      "sum_quantity DOUBLE  NOT NULL,\n" +
      "sum_base_price DOUBLE  NOT NULL,\n" +
      "sum_discount_price DOUBLE  NOT NULL,\n" +
      "sum_charge_vat_inclusive DOUBLE  NOT NULL,\n" +
      "avg_quantity DOUBLE  NOT NULL,\n" +
      "avg_base_price DOUBLE  NOT NULL,\n" +
      "avg_discount DOUBLE  NOT NULL,\n" +
      "count_order BIGINT  NOT NULL \n" +
      ") WITH ( \n" +
      "'bucket' = '2'\n" +
      ");"
        // 简单的聚合处理
        String transformSQL1 =
                "INSERT  INTO dwd_lineitem SELECT l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,\n" +
                 "l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,\n" +
          "YEAR(l_shipdate) AS l_year,MONTH(l_shipdate) AS l_month FROM ods_lineitem;"
    String transformSQL2=
               "INSERT  INTO ads_pricing_summary SELECT l_returnflag,l_linestatus,SUM(l_quantity) AS sum_quantity,\n" +
               "SUM(l_extendedprice) AS sum_base_price,SUM(l_extendedprice * (1-l_discount)) AS sum_discount_price,\n" + 
                "SUM(l_extendedprice * (1-l_discount) * (1+l_tax)) AS sum_charge_vat_inclusive,AVG(l_quantity) AS avg_quantity,\n" + 
                "AVG(l_extendedprice) AS avg_base_price,AVG(l_discount) AS avg_discount,COUNT(*) AS count_order FROM dwd_lineitem \n" + 
                "WHERE (l_year < 1998 OR (l_year = 1998 AND l_month<= 9)) AND l_shipdate <= DATE '1998-12-01' - INTERVAL '90' DAY \n" + 
                "GROUP BY  l_returnflag,l_linestatus;"    
        // 插入hudi表
        tableEnv.executeSql(tableSourceDDL);
        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sinkDWDDDL);
        tableEnv.executeSql(sinkADSDDL);
        TableResult result = tableEnv.executeSql(transformSQL1);
        TableResult result = tableEnv.executeSql(transformSQL2);
        env.execute("mysql-to-tableSource");
    }
}

详细请参阅:

https://github.com/LadyForest/flink-table-store-101/blob/master/real-time-update/README.zh.md


相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
4月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
1964 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
7月前
|
数据采集 SQL canal
Amoro + Flink CDC 数据融合入湖新体验
本文总结了货拉拉高级大数据开发工程师陈政羽在Flink Forward Asia 2024上的分享,聚焦Flink CDC在货拉拉的应用与优化。内容涵盖CDC应用现状、数据入湖新体验、入湖优化及未来规划。文中详细分析了CDC在多业务场景中的实践,包括数据采集平台化、稳定性建设,以及面临的文件碎片化、Schema演进等挑战。同时介绍了基于Apache Amoro的湖仓融合架构,通过自优化服务解决小文件问题,提升数据新鲜度与读写平衡。未来将深化Paimon与Amoro的结合,打造更高效的入湖生态与自动化优化方案。
427 1
Amoro + Flink CDC 数据融合入湖新体验
|
7月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1312 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
6月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
4月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
518 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
3760 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
559 56
|
11月前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
711 0
Flink CDC 在阿里云实时计算Flink版的云上实践