Flink导入数据到starrocks
支持的数据源
- CSV
- JSON
操作步骤
步骤一:添加 pom 依赖
<dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <!-- for flink-1.14 --> <version>x.x.x_flink-1.14_2.11</version> <version>x.x.x_flink-1.14_2.12</version> </dependency>
步骤二:调用 flink-connector-starrocks
bean2starrocks
import com.starrocks.connector.flink.StarRocksSink; import com.starrocks.connector.flink.row.StarRocksSinkRowBuilder; import com.starrocks.connector.flink.table.StarRocksSinkOptions; import com.starrocks.funcs.BeanDataJava; import com.starrocks.funcs.MySourceJava; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; import java.util.concurrent.TimeUnit; /** * Demo1 * - define Class BeanData, * - sink to StarRocks via flink-connector-starrocks */ public class Bean2StarRocksJava { public static void main(String[] args) { StreamExecutionEnvironment env = getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<BeanDataJava> sourceStream = env .addSource(new MySourceJava()) .uid("sourceStream-uid").name("sourceStream") .setParallelism(1) .map(new MapFunction<Row, BeanDataJava>() { @Override public BeanDataJava map(Row value) throws Exception { String name = value.getField(0).toString(); int score = Integer.parseInt(value.getField(1).toString()); return new BeanDataJava(name,score); } }) .uid("sourceStreamMap-uid").name("sourceStreamMap") .setParallelism(1); sourceStream .addSink( StarRocksSink.sink( // the table structure TableSchema.builder() .field("name", DataTypes.VARCHAR(20)) .field("score", DataTypes.INT()) .build(), /* The sink options for this demo: - hostname: master1 - fe http port: 8030 - database name: starrocks_demo - table names: demo2_flink_tb1 - TODO: customize above args to fit your environment. */ StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://master1:9030/starrocks_demo") .withProperty("load-url", "master1:8030") .withProperty("username", "root") .withProperty("password", "") .withProperty("table-name", "demo2_flink_tb1") .withProperty("database-name", "starrocks_demo") .withProperty("sink.properties.row_delimiter","\\x02") // in case of raw data contains common delimiter like '\n' .withProperty("sink.properties.column_separator","\\x01") // in case of raw data contains common separator like '\t' .withProperty("sink.buffer-flush.interval-ms","5000") .build(), // set the slots with streamRowData new StarRocksSinkRowBuilder<BeanDataJava>() { @Override public void accept(Object[] objects, BeanDataJava beanDataJava) { objects[0] = beanDataJava.getName(); objects[1] = new Integer(beanDataJava.getScore()); } } ) ) .uid("sourceSink-uid").name("sourceSink") .setParallelism(1); try { env.execute("StarRocksSink_BeanDataJava"); } catch (Exception e) { e.printStackTrace(); } } private static StreamExecutionEnvironment getExecutionEnvironment(){ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setMaxParallelism(3); env.setParallelism(3); env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, //failureRate org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), // failureInterval org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delayInterval )); // checkpoint options env.enableCheckpointing(1000 * 30); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 10); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); return env; } }
json2Starrocks
import com.starrocks.connector.flink.StarRocksSink; import com.starrocks.connector.flink.row.StarRocksSinkRowBuilder; import com.starrocks.connector.flink.table.StarRocksSinkOptions; import com.starrocks.funcs.BeanDataJava; import com.starrocks.funcs.MySourceJava; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; import java.util.concurrent.TimeUnit; /** * Demo1 * - define Class BeanData, * - sink to StarRocks via flink-connector-starrocks */ public class Bean2StarRocksJava { public static void main(String[] args) { StreamExecutionEnvironment env = getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<BeanDataJava> sourceStream = env .addSource(new MySourceJava()) .uid("sourceStream-uid").name("sourceStream") .setParallelism(1) .map(new MapFunction<Row, BeanDataJava>() { @Override public BeanDataJava map(Row value) throws Exception { String name = value.getField(0).toString(); int score = Integer.parseInt(value.getField(1).toString()); return new BeanDataJava(name,score); } }) .uid("sourceStreamMap-uid").name("sourceStreamMap") .setParallelism(1); sourceStream .addSink( StarRocksSink.sink( // the table structure TableSchema.builder() .field("name", DataTypes.VARCHAR(20)) .field("score", DataTypes.INT()) .build(), /* The sink options for this demo: - hostname: master1 - fe http port: 8030 - database name: starrocks_demo - table names: demo2_flink_tb1 - TODO: customize above args to fit your environment. */ StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://master1:9030/starrocks_demo") .withProperty("load-url", "master1:8030") .withProperty("username", "root") .withProperty("password", "") .withProperty("table-name", "demo2_flink_tb1") .withProperty("database-name", "starrocks_demo") .withProperty("sink.properties.row_delimiter","\\x02") // in case of raw data contains common delimiter like '\n' .withProperty("sink.properties.column_separator","\\x01") // in case of raw data contains common separator like '\t' .withProperty("sink.buffer-flush.interval-ms","5000") .build(), // set the slots with streamRowData new StarRocksSinkRowBuilder<BeanDataJava>() { @Override public void accept(Object[] objects, BeanDataJava beanDataJava) { objects[0] = beanDataJava.getName(); objects[1] = new Integer(beanDataJava.getScore()); } } ) ) .uid("sourceSink-uid").name("sourceSink") .setParallelism(1); try { env.execute("StarRocksSink_BeanDataJava"); } catch (Exception e) { e.printStackTrace(); } } private static StreamExecutionEnvironment getExecutionEnvironment(){ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setMaxParallelism(3); env.setParallelism(3); env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, //failureRate org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), // failureInterval org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delayInterval )); // checkpoint options env.enableCheckpointing(1000 * 30); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 10); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); return env; } }
sql2Starrocks
import com.starrocks.funcs.MySourceJava; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import java.util.concurrent.TimeUnit; import static org.apache.flink.table.api.Expressions.$; /** * Demo3: * - Construct TemporaryView via org.apache.flink.types.Row * - FlinkSql -> flink-connector-starrocksdb -> StarRocksDB */ public class Sql2StarRocksJava { public static void main(String[] args) { StreamExecutionEnvironment env = getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env,settings); DataStream<Row> source = env .addSource(new MySourceJava(),getRowTypeInfo()) .uid("sourceStream-uid").name("sourceStream") .setParallelism(1); Table sourceTable = streamTableEnv.fromDataStream(source,$("name"),$("score")); streamTableEnv.createTemporaryView("sourceTable",sourceTable); /* The sink options for this demo: - hostname: master1 - fe http port: 8030 - database name: starrocksdb_demo - table names: demo2_flink_tb1 - TODO: customize above args to fit your environment. */ streamTableEnv.executeSql( "CREATE TABLE testTable( "+ " `name` VARCHAR, "+ " `score` INT "+ " ) WITH ( "+ " 'connector' = 'starrocks', "+ " 'jdbc-url'='jdbc:mysql://master1:9030/starrocksdb_demo', "+ " 'load-url'='master1:8030', "+ " 'database-name' = 'starrocks_demo', "+ " 'table-name' = 'demo2_flink_tb3', "+ " 'username' = 'root', "+ " 'password' = '', "+ " 'sink.buffer-flush.max-rows' = '1000000', "+ " 'sink.buffer-flush.max-bytes' = '300000000', "+ " 'sink.buffer-flush.interval-ms' = '15000', "+ " 'sink.max-retries' = '3', "+ " 'sink.properties.row_delimiter' = '\\x02', "+ " 'sink.properties.column_separator' = '\\x01', "+ " 'sink.properties.columns' = 'NAME, SCORE' "+ " )" ); // TODO Cautions for Scala codes: // 1. 3x quotation marks save some careful work with escape characters, using '\x02' and '\x01' directly. // 2. When concat multiple lines with double quotation marks, please use "\\x02" and "\\x01" instead, e.g. : // ... // + "'sink.properties.row_delimiter' = '\\x02'," // + "'sink.properties.column_separator' = '\\x01' " // + ... streamTableEnv.executeSql( "insert into testTable select `name`,`score` from sourceTable"); try { env.execute("StarRocksSink_SQLJava"); } catch (Exception e) { e.printStackTrace(); } } private static StreamExecutionEnvironment getExecutionEnvironment(){ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setMaxParallelism(3); env.setParallelism(3); env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, //failureRate org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), // failureInterval org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delayInterval )); // checkpoint options env.enableCheckpointing(1000 * 30); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 10); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); return env; } public static RowTypeInfo getRowTypeInfo(){ return new RowTypeInfo( TypeInformation.of(String.class),TypeInformation.of(int.class)); } }
Flink 导出starrocks数据
flink-connector-starrocks 的 source 功能暂时无法保证 exactly-once 语义。如果读取任务失败,您需要重复本步骤,再次创建读取任务。
步骤一:添加 pom 依赖
<dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <!-- for flink-1.14 --> <version>x.x.x_flink-1.14_2.11</version> <version>x.x.x_flink-1.14_2.12</version> </dependency>
步骤二:调用 flink-connector-starrocks
参考如下示例代码,调用 flink-connector-starrocks,读取 StarRocks 的数据。相关参数说明:
import com.starrocks.connector.flink.StarRocksSource; import com.starrocks.connector.flink.table.source.StarRocksSourceOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; public class StarRocksSourceApp { public static void main(String[] args) { StarRocksSourceOptions options = StarRocksSourceOptions.builder() .withProperty("scan-url", "192.168.xxx.xxx:8030,192.168.xxx.xxx:8030") .withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030") .withProperty("username", "root") .withProperty("password", "xxxxxx") .withProperty("table-name", "flink_test") .withProperty("database-name", "test") .build(); TableSchema tableSchema = TableSchema.builder() .field("date_1", DataTypes.DATE()) .field("datetime_1", DataTypes.TIMESTAMP(6)) .field("char_1", DataTypes.CHAR(20)) .field("varchar_1", DataTypes.STRING()) .field("boolean_1", DataTypes.BOOLEAN()) .field("tinyint_1", DataTypes.TINYINT()) .field("smallint_1", DataTypes.SMALLINT()) .field("int_1", DataTypes.INT()) .field("bigint_1", DataTypes.BIGINT()) .field("largeint_1", DataTypes.STRING()) .field("float_1", DataTypes.FLOAT()) .field("double_1", DataTypes.DOUBLE()) .field("decimal_1", DataTypes.DECIMAL(27, 9)) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print(); env.execute("StarRocks flink source"); } }
spark 导入数据到starrocks
支持的数据格式
- ORC(2.0 版本之后支持)
- PARQUET(2.0 版本之后支持)
操作步骤
步骤一:添加 pom 依赖
<dependency> <groupId>com.starrocks.connector</groupId> <artifactId>spark</artifactId> <version>1.0.0</version> <scope>system</scope> <systemPath>${project.basedir}/src/main/resources/starrocks-spark2_2.11-1.0.0.jar</systemPath> </dependency>
步骤二:调用 spark-connector-starrocks
sparkLoad2StarRocks
import com.starrocks.utils.{Consts, LoggerUtil, MySrSink} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import com.starrocks.connector.spark._ object SparkConnector2StarRocks { // parameters val starRocksName = "starrocks_demo" val tblNameSrc = "demo1_spark_tb1" val tblNameDst = "demo1_spark_tb2" val userName = "root" val password = "" val srFe = "master1" // fe hostname val port = 8030 // fe http port val filterRatio = 0.2 val columns = "uid,date,hour,minute,site" val master = "local" val appName = "app_spark_demo2" val partitions = 2 // computing parallelism val buckets = 1 // sink parallelism val debug = false LoggerUtil.setSparkLogLevels() def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName(appName) .setMaster(master) .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") val spark = SparkSession.builder().config(conf).master(master).enableHiveSupport().getOrCreate() val sc = spark.sparkContext sc.setLogLevel("WARN") import spark.implicits._ val starrocksSparkDF = spark.read.format("starrocks") .option("starrocks.table.identifier", s"${starRocksName}.${tblNameSrc}") .option("starrocks.fenodes", s"${srFe}:${port}") .option("user", s"${userName}") .option("password", s"${password}") .load().repartition(partitions) starrocksSparkDF.show(5, false) starrocksSparkDF.createOrReplaceTempView("view_tb1") val resDf = spark.sql( """ |select uid, date, hour, minute, site |from view_tb1 |lateral view explode(split(uid_list_str,',')) temp_tbl as uid |""".stripMargin) resDf.show(5, false) // IDEA/REPL local outputs resDf.map( x => x.toString().replaceAll("\\[|\\]","").replace(",",Consts.starrocksSep)) .repartition(buckets).foreachPartition( itr => { val sink = new MySrSink(Map( "max_filter_ratio" -> s"${filterRatio}", "columns" -> columns, "column_separator" -> Consts.starrocksSep), starRocksName, userName, password, tblNameDst, srFe, port, debug, debug) if (itr.hasNext) sink.invoke(itr.mkString("\n")) } ) spark.close() } }
SparkStreaming2StarRocks
import com.starrocks.utils.{Consts, LoggerUtil, MySrSink} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import com.starrocks.connector.spark._ object SparkConnector2StarRocks { // parameters val starRocksName = "starrocks_demo" val tblNameSrc = "demo1_spark_tb1" val tblNameDst = "demo1_spark_tb2" val userName = "root" val password = "" val srFe = "master1" // fe hostname val port = 8030 // fe http port val filterRatio = 0.2 val columns = "uid,date,hour,minute,site" val master = "local" val appName = "app_spark_demo2" val partitions = 2 // computing parallelism val buckets = 1 // sink parallelism val debug = false LoggerUtil.setSparkLogLevels() def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName(appName) .setMaster(master) .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") val spark = SparkSession.builder().config(conf).master(master).enableHiveSupport().getOrCreate() val sc = spark.sparkContext sc.setLogLevel("WARN") import spark.implicits._ val starrocksSparkDF = spark.read.format("starrocks") .option("starrocks.table.identifier", s"${starRocksName}.${tblNameSrc}") .option("starrocks.fenodes", s"${srFe}:${port}") .option("user", s"${userName}") .option("password", s"${password}") .load().repartition(partitions) starrocksSparkDF.show(5, false) starrocksSparkDF.createOrReplaceTempView("view_tb1") val resDf = spark.sql( """ |select uid, date, hour, minute, site |from view_tb1 |lateral view explode(split(uid_list_str,',')) temp_tbl as uid |""".stripMargin) resDf.show(5, false) // IDEA/REPL local outputs resDf.map( x => x.toString().replaceAll("\\[|\\]","").replace(",",Consts.starrocksSep)) .repartition(buckets).foreachPartition( itr => { val sink = new MySrSink(Map( // "label"->"label123" : // 1. If not customized, StarRocks randomly generates a code as the label; // 2. Stream-load label is 'Unique', the Stream-load with same label can be loaded only once. // [Good choice]: the label can be combined with info like batch-time and TaskContext.get.partitionId(). "max_filter_ratio" -> s"${filterRatio}", "columns" -> columns, "column_separator" -> Consts.starrocksSep), starRocksName, userName, password, tblNameDst, srFe, port, debug, debug) if (itr.hasNext) sink.invoke(itr.mkString("\n")) } ) spark.close() } }
spark 导出starrocks到其它
使用说明
- 当前版本只支持从 StarRocks 中读取数据,不支持从 Sink 写入数据到 StarRocks 中。
- 支持在 StarRocks 端完成数据过滤,从而减少数据传输量。
- 如果读取数据的开销比较大,可以通过合理的表设计和使用过滤条件,控制 Spark不要一次读取过多的数据,从而避免给磁盘和网络造成过大的 I/O 压力或影响正常的查询业务。
版本要求
操作步骤
步骤一:添加 pom 依赖
<dependency> <groupId>com.starrocks</groupId> <artifactId>starrocks-stream-load-sdk</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.12.10</version> </dependency> <dependency> <groupId>com.starrocks</groupId> <artifactId>starrocks-thrift-sdk</artifactId> <version>1.0.1</version> </dependency>
步骤一:使用spark 读取starRocks
使用 Spark DataFrame 读取数据
val starrocksSparkDF = spark.read.format("starrocks") .option("starrocks.table.identifier", s"test.score_board") .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>") .option("user", s"root") .option("password", s"") ## 分区裁剪 .option("starrocks.filter.query", "dt='2022-01-02 08:00:00'") ## 分桶裁剪 .option("starrocks.filter.query", "k=1") ## 分区分桶裁剪 .option("starrocks.filter.query", "k=7 and dt='2022-01-02 08:00:00'") .load()
使用spark RDD的方式读取数据
import com.starrocks.connector.spark._ val starrocksSparkRDD = sc.starrocksRDD( tableIdentifier = Some("$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME"), cfg = Some(Map( "starrocks.fenodes" -> "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESTFUL_PORT", "starrocks.request.auth.user" -> "$YOUR_STARROCKS_USERNAME", "starrocks.request.auth.password" -> "$YOUR_STARROCKS_PASSWORD" )) )
使用sparksql的方式读取starrocks
val resDf = spark.sql( |CREATE TEMPORARY VIEW spark_starrocks | USING starrocks | OPTIONS | ( | "starrocks.table.identifier" = "test.score_board", | "starrocks.fenodes" = "<fe_host>:<fe_http_port>", | "user" = "root", | "password" = "" | ); | |""".stripMargin)
参数设置
通用参数
以下参数适用于 Spark SQL、Spark DataFrame、Spark RDD 三种读取方式。
Spark SQL 和 Spark DataFrame 专有参数
以下参数仅适用于 Spark SQL 和 Spark DataFrame 读取方式。
Spark RDD 专有参数
以下参数仅适用于 Spark RDD 读取方式。