spark/Flink 导入导出starrocks

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: spark/Flink 导入导出starrocks


Flink导入数据到starrocks

支持的数据源

  • CSV
  • JSON

操作步骤
步骤一:添加 pom 依赖

<dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- for flink-1.14 -->
    <version>x.x.x_flink-1.14_2.11</version>
    <version>x.x.x_flink-1.14_2.12</version>
</dependency>

步骤二:调用 flink-connector-starrocks

bean2starrocks

import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.row.StarRocksSinkRowBuilder;
import com.starrocks.connector.flink.table.StarRocksSinkOptions;
import com.starrocks.funcs.BeanDataJava;
import com.starrocks.funcs.MySourceJava;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import java.util.concurrent.TimeUnit;
/**
 *  Demo1
 *   - define Class BeanData,
 *   - sink to StarRocks via flink-connector-starrocks
 */
public class Bean2StarRocksJava {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        DataStream<BeanDataJava> sourceStream = env
                .addSource(new MySourceJava())
                    .uid("sourceStream-uid").name("sourceStream")
                    .setParallelism(1)
                .map(new MapFunction<Row, BeanDataJava>() {
                    @Override
                    public BeanDataJava map(Row value) throws Exception {
                        String name = value.getField(0).toString();
                        int score = Integer.parseInt(value.getField(1).toString());
                        return new BeanDataJava(name,score);
                    }
                })
                    .uid("sourceStreamMap-uid").name("sourceStreamMap")
                    .setParallelism(1);
        sourceStream
                .addSink(
                        StarRocksSink.sink(
                                // the table structure
                                TableSchema.builder()
                                        .field("name", DataTypes.VARCHAR(20))
                                        .field("score", DataTypes.INT())
                                        .build(),
                                /*
                                The sink options for this demo:
                                - hostname: master1
                                - fe http port: 8030
                                - database name: starrocks_demo
                                - table names: demo2_flink_tb1
                                - TODO: customize above args to fit your environment.
                                */
                                StarRocksSinkOptions.builder()
                                        .withProperty("jdbc-url", "jdbc:mysql://master1:9030/starrocks_demo")
                                        .withProperty("load-url", "master1:8030")
                                        .withProperty("username", "root")
                                        .withProperty("password", "")
                                        .withProperty("table-name", "demo2_flink_tb1")
                                        .withProperty("database-name", "starrocks_demo")
                                        .withProperty("sink.properties.row_delimiter","\\x02")      // in case of raw data contains common delimiter like '\n'
                                        .withProperty("sink.properties.column_separator","\\x01")   // in case of raw data contains common separator like '\t'
                                        .withProperty("sink.buffer-flush.interval-ms","5000")
                                        .build(),
                                // set the slots with streamRowData
                                new StarRocksSinkRowBuilder<BeanDataJava>() {
                                    @Override
                                    public void accept(Object[] objects, BeanDataJava beanDataJava) {
                                        objects[0] = beanDataJava.getName();
                                        objects[1] = new Integer(beanDataJava.getScore());
                                    }
                                }
                        )
                )
                .uid("sourceSink-uid").name("sourceSink")
                .setParallelism(1);
        try {
            env.execute("StarRocksSink_BeanDataJava");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static StreamExecutionEnvironment getExecutionEnvironment(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setMaxParallelism(3);
        env.setParallelism(3);
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, //failureRate
                org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), // failureInterval
                org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delayInterval
        ));
        // checkpoint options
        env.enableCheckpointing(1000 * 30);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 10);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
        return env;
    }
}

json2Starrocks

import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.row.StarRocksSinkRowBuilder;
import com.starrocks.connector.flink.table.StarRocksSinkOptions;
import com.starrocks.funcs.BeanDataJava;
import com.starrocks.funcs.MySourceJava;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import java.util.concurrent.TimeUnit;
/**
 *  Demo1
 *   - define Class BeanData,
 *   - sink to StarRocks via flink-connector-starrocks
 */
public class Bean2StarRocksJava {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        DataStream<BeanDataJava> sourceStream = env
                .addSource(new MySourceJava())
                    .uid("sourceStream-uid").name("sourceStream")
                    .setParallelism(1)
                .map(new MapFunction<Row, BeanDataJava>() {
                    @Override
                    public BeanDataJava map(Row value) throws Exception {
                        String name = value.getField(0).toString();
                        int score = Integer.parseInt(value.getField(1).toString());
                        return new BeanDataJava(name,score);
                    }
                })
                    .uid("sourceStreamMap-uid").name("sourceStreamMap")
                    .setParallelism(1);
        sourceStream
                .addSink(
                        StarRocksSink.sink(
                                // the table structure
                                TableSchema.builder()
                                        .field("name", DataTypes.VARCHAR(20))
                                        .field("score", DataTypes.INT())
                                        .build(),
                                /*
                                The sink options for this demo:
                                - hostname: master1
                                - fe http port: 8030
                                - database name: starrocks_demo
                                - table names: demo2_flink_tb1
                                - TODO: customize above args to fit your environment.
                                */
                                StarRocksSinkOptions.builder()
                                        .withProperty("jdbc-url", "jdbc:mysql://master1:9030/starrocks_demo")
                                        .withProperty("load-url", "master1:8030")
                                        .withProperty("username", "root")
                                        .withProperty("password", "")
                                        .withProperty("table-name", "demo2_flink_tb1")
                                        .withProperty("database-name", "starrocks_demo")
                                        .withProperty("sink.properties.row_delimiter","\\x02")      // in case of raw data contains common delimiter like '\n'
                                        .withProperty("sink.properties.column_separator","\\x01")   // in case of raw data contains common separator like '\t'
                                        .withProperty("sink.buffer-flush.interval-ms","5000")
                                        .build(),
                                // set the slots with streamRowData
                                new StarRocksSinkRowBuilder<BeanDataJava>() {
                                    @Override
                                    public void accept(Object[] objects, BeanDataJava beanDataJava) {
                                        objects[0] = beanDataJava.getName();
                                        objects[1] = new Integer(beanDataJava.getScore());
                                    }
                                }
                        )
                )
                .uid("sourceSink-uid").name("sourceSink")
                .setParallelism(1);
        try {
            env.execute("StarRocksSink_BeanDataJava");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static StreamExecutionEnvironment getExecutionEnvironment(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setMaxParallelism(3);
        env.setParallelism(3);
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, //failureRate
                org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), // failureInterval
                org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delayInterval
        ));
        // checkpoint options
        env.enableCheckpointing(1000 * 30);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 10);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
        return env;
    }
}

sql2Starrocks

import com.starrocks.funcs.MySourceJava;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.table.api.Expressions.$;
/**
  * Demo3:
  *    - Construct TemporaryView via org.apache.flink.types.Row
  *    - FlinkSql -> flink-connector-starrocksdb -> StarRocksDB
  */
public class Sql2StarRocksJava {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env,settings);
        DataStream<Row> source = env
                .addSource(new MySourceJava(),getRowTypeInfo())
                .uid("sourceStream-uid").name("sourceStream")
                .setParallelism(1);
        Table sourceTable = streamTableEnv.fromDataStream(source,$("name"),$("score"));
        streamTableEnv.createTemporaryView("sourceTable",sourceTable);
        /*
        The sink options for this demo:
        - hostname: master1
        - fe http port: 8030
        - database name: starrocksdb_demo
        - table names: demo2_flink_tb1
        - TODO: customize above args to fit your environment.
        */
        streamTableEnv.executeSql(
                "CREATE TABLE testTable( "+
                          "  `name` VARCHAR, "+
                          "  `score` INT "+
                          " ) WITH ( "+
                          "  'connector' = 'starrocks', "+
                          "  'jdbc-url'='jdbc:mysql://master1:9030/starrocksdb_demo', "+
                          "  'load-url'='master1:8030', "+
                          "  'database-name' = 'starrocks_demo', "+
                          "  'table-name' = 'demo2_flink_tb3', "+
                          "  'username' = 'root', "+
                          "  'password' = '', "+
                          "  'sink.buffer-flush.max-rows' = '1000000', "+
                          "  'sink.buffer-flush.max-bytes' = '300000000', "+
                          "  'sink.buffer-flush.interval-ms' = '15000', "+
                          "  'sink.max-retries' = '3', "+
                          "  'sink.properties.row_delimiter' = '\\x02', "+
                          "  'sink.properties.column_separator' = '\\x01', "+
                          "  'sink.properties.columns' = 'NAME, SCORE' "+
                          " )"
    );
        // TODO Cautions for Scala codes:
        // 1. 3x quotation marks save some careful work with escape characters, using '\x02' and  '\x01' directly.
        // 2. When concat multiple lines with double quotation marks, please use "\\x02" and "\\x01" instead, e.g. :
        //  ...
        //  + "'sink.properties.row_delimiter' = '\\x02',"
        //  + "'sink.properties.column_separator' = '\\x01' "
        //  + ...
        streamTableEnv.executeSql(
                "insert into testTable select `name`,`score` from sourceTable");
        try {
            env.execute("StarRocksSink_SQLJava");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static StreamExecutionEnvironment getExecutionEnvironment(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setMaxParallelism(3);
        env.setParallelism(3);
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, //failureRate
                org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), // failureInterval
                org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delayInterval
        ));
        // checkpoint options
        env.enableCheckpointing(1000 * 30);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 10);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
        return env;
    }
  public static RowTypeInfo getRowTypeInfo(){
    return new RowTypeInfo(
        TypeInformation.of(String.class),TypeInformation.of(int.class));
  }
}

Flink 导出starrocks数据

flink-connector-starrocks 的 source 功能暂时无法保证 exactly-once 语义。如果读取任务失败,您需要重复本步骤,再次创建读取任务。


步骤一:添加 pom 依赖

<dependency>    
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- for flink-1.14 -->
    <version>x.x.x_flink-1.14_2.11</version>
    <version>x.x.x_flink-1.14_2.12</version>
</dependency>

步骤二:调用 flink-connector-starrocks

参考如下示例代码,调用 flink-connector-starrocks,读取 StarRocks 的数据。相关参数说明:

import com.starrocks.connector.flink.StarRocksSource;
import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
public class StarRocksSourceApp {
    public static void main(String[] args) {
        StarRocksSourceOptions options = StarRocksSourceOptions.builder()
               .withProperty("scan-url", "192.168.xxx.xxx:8030,192.168.xxx.xxx:8030")
               .withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030")
               .withProperty("username", "root")
               .withProperty("password", "xxxxxx")
               .withProperty("table-name", "flink_test")
               .withProperty("database-name", "test")
               .build();
        TableSchema tableSchema = TableSchema.builder()
               .field("date_1", DataTypes.DATE())
               .field("datetime_1", DataTypes.TIMESTAMP(6))
               .field("char_1", DataTypes.CHAR(20))
               .field("varchar_1", DataTypes.STRING())
               .field("boolean_1", DataTypes.BOOLEAN())
               .field("tinyint_1", DataTypes.TINYINT())
               .field("smallint_1", DataTypes.SMALLINT())
               .field("int_1", DataTypes.INT())
               .field("bigint_1", DataTypes.BIGINT())
               .field("largeint_1", DataTypes.STRING())
               .field("float_1", DataTypes.FLOAT())
               .field("double_1", DataTypes.DOUBLE())
               .field("decimal_1", DataTypes.DECIMAL(27, 9))
               .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();
        env.execute("StarRocks flink source");
    }
}

spark 导入数据到starrocks

支持的数据格式

  • CSV
    • ORC(2.0 版本之后支持)
    • PARQUET(2.0 版本之后支持)


    操作步骤
    步骤一:添加 pom 依赖

     <dependency>
                <groupId>com.starrocks.connector</groupId>
                <artifactId>spark</artifactId>
                <version>1.0.0</version>
                <scope>system</scope>
                <systemPath>${project.basedir}/src/main/resources/starrocks-spark2_2.11-1.0.0.jar</systemPath>
            </dependency>

    步骤二:调用 spark-connector-starrocks
    sparkLoad2StarRocks

    import com.starrocks.utils.{Consts, LoggerUtil, MySrSink}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import com.starrocks.connector.spark._
    object SparkConnector2StarRocks {
      // parameters
      val starRocksName =  "starrocks_demo"
      val tblNameSrc =  "demo1_spark_tb1"
      val tblNameDst =  "demo1_spark_tb2"
      val userName =  "root"
      val password =  ""
      val srFe = "master1"   // fe hostname
      val port =  8030          // fe http port
      val filterRatio =  0.2
      val columns = "uid,date,hour,minute,site"
      val master = "local"
      val appName = "app_spark_demo2"
      val partitions =   2   // computing parallelism
      val buckets =   1      // sink parallelism
      val debug = false
      LoggerUtil.setSparkLogLevels()
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName(appName)
          .setMaster(master)
          .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
        val spark = SparkSession.builder().config(conf).master(master).enableHiveSupport().getOrCreate()
        val sc = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._
        val starrocksSparkDF = spark.read.format("starrocks")
          .option("starrocks.table.identifier", s"${starRocksName}.${tblNameSrc}")
          .option("starrocks.fenodes", s"${srFe}:${port}")
          .option("user", s"${userName}")
          .option("password", s"${password}")
          .load().repartition(partitions)
        starrocksSparkDF.show(5, false)
        starrocksSparkDF.createOrReplaceTempView("view_tb1")
        val resDf = spark.sql(
          """
            |select uid, date, hour, minute, site
            |from view_tb1
            |lateral view explode(split(uid_list_str,',')) temp_tbl as uid
            |""".stripMargin)
        resDf.show(5, false)  // IDEA/REPL local outputs
        resDf.map( x => x.toString().replaceAll("\\[|\\]","").replace(",",Consts.starrocksSep))
          .repartition(buckets).foreachPartition(
          itr => {
            val sink = new MySrSink(Map(
              "max_filter_ratio" -> s"${filterRatio}",
              "columns" -> columns,
              "column_separator" -> Consts.starrocksSep),
              starRocksName,
              userName,
              password,
              tblNameDst,
              srFe,
              port,
              debug,
              debug)
            if (itr.hasNext) sink.invoke(itr.mkString("\n"))
          }
        )
        spark.close()
      }
    }

    SparkStreaming2StarRocks

    import com.starrocks.utils.{Consts, LoggerUtil, MySrSink}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import com.starrocks.connector.spark._
    object SparkConnector2StarRocks {
      // parameters
      val starRocksName =  "starrocks_demo"
      val tblNameSrc =  "demo1_spark_tb1"
      val tblNameDst =  "demo1_spark_tb2"
      val userName =  "root"
      val password =  ""
      val srFe = "master1"   // fe hostname
      val port =  8030          // fe http port
      val filterRatio =  0.2
      val columns = "uid,date,hour,minute,site"
      val master = "local"
      val appName = "app_spark_demo2"
      val partitions =   2   // computing parallelism
      val buckets =   1      // sink parallelism
      val debug = false
      LoggerUtil.setSparkLogLevels()
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName(appName)
          .setMaster(master)
          .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
        val spark = SparkSession.builder().config(conf).master(master).enableHiveSupport().getOrCreate()
        val sc = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._
        val starrocksSparkDF = spark.read.format("starrocks")
          .option("starrocks.table.identifier", s"${starRocksName}.${tblNameSrc}")
          .option("starrocks.fenodes", s"${srFe}:${port}")
          .option("user", s"${userName}")
          .option("password", s"${password}")
          .load().repartition(partitions)
        starrocksSparkDF.show(5, false)
        starrocksSparkDF.createOrReplaceTempView("view_tb1")
        val resDf = spark.sql(
          """
            |select uid, date, hour, minute, site
            |from view_tb1
            |lateral view explode(split(uid_list_str,',')) temp_tbl as uid
            |""".stripMargin)
        resDf.show(5, false)  // IDEA/REPL local outputs
        resDf.map( x => x.toString().replaceAll("\\[|\\]","").replace(",",Consts.starrocksSep))
          .repartition(buckets).foreachPartition(
          itr => {
            val sink = new MySrSink(Map(
              // "label"->"label123" :
              //     1. If not customized, StarRocks randomly generates a code as the label;
              //     2. Stream-load label is 'Unique', the Stream-load with same label can be loaded only once.
              //        [Good choice]: the label can be combined with info like batch-time and TaskContext.get.partitionId().
              "max_filter_ratio" -> s"${filterRatio}",
              "columns" -> columns,
              "column_separator" -> Consts.starrocksSep),
              starRocksName,
              userName,
              password,
              tblNameDst,
              srFe,
              port,
              debug,
              debug)
            if (itr.hasNext) sink.invoke(itr.mkString("\n"))
          }
        )
        spark.close()
      }
    }

    spark 导出starrocks到其它

    使用说明

    • 当前版本只支持从 StarRocks 中读取数据,不支持从 Sink 写入数据到 StarRocks 中。
    • 支持在 StarRocks 端完成数据过滤,从而减少数据传输量。
    • 如果读取数据的开销比较大,可以通过合理的表设计和使用过滤条件,控制 Spark不要一次读取过多的数据,从而避免给磁盘和网络造成过大的 I/O 压力或影响正常的查询业务。

    版本要求

    60a6bcefe26f4b118e50f46e4d0afd1d.png

    操作步骤

    步骤一:添加 pom 依赖

        <dependency>
                <groupId>com.starrocks</groupId>
                <artifactId>starrocks-stream-load-sdk</artifactId>
                <version>1.0-SNAPSHOT</version>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.12.10</version>
            </dependency>
            <dependency>
                <groupId>com.starrocks</groupId>
                <artifactId>starrocks-thrift-sdk</artifactId>
                <version>1.0.1</version>
            </dependency>

    步骤一:使用spark  读取starRocks
    使用 Spark DataFrame 读取数据

    val starrocksSparkDF = spark.read.format("starrocks")
               .option("starrocks.table.identifier", s"test.score_board")
               .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
               .option("user", s"root")
               .option("password", s"")
    ## 分区裁剪           
                .option("starrocks.filter.query", "dt='2022-01-02 08:00:00'")
    ##   分桶裁剪       
               .option("starrocks.filter.query", "k=1")   
    ##   分区分桶裁剪  
               .option("starrocks.filter.query", "k=7 and dt='2022-01-02 08:00:00'")                
               .load()

    使用spark RDD的方式读取数据

    import com.starrocks.connector.spark._
    val starrocksSparkRDD = sc.starrocksRDD(
      tableIdentifier = Some("$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME"),
      cfg = Some(Map(
        "starrocks.fenodes" -> "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESTFUL_PORT",
        "starrocks.request.auth.user" -> "$YOUR_STARROCKS_USERNAME",
        "starrocks.request.auth.password" -> "$YOUR_STARROCKS_PASSWORD"
      ))
    )

    使用sparksql的方式读取starrocks

    val resDf = spark.sql(
            |CREATE TEMPORARY VIEW spark_starrocks
            |  USING starrocks
            |  OPTIONS
            |  (
            |      "starrocks.table.identifier" = "test.score_board",
            |      "starrocks.fenodes" = "<fe_host>:<fe_http_port>",
            |      "user" = "root",
            |      "password" = ""
            |  );
            |
            |""".stripMargin)
    

    参数设置

    通用参数

    以下参数适用于 Spark SQL、Spark DataFrame、Spark RDD 三种读取方式。

    60a6bcefe26f4b118e50f46e4d0afd1d.png

    Spark SQL 和 Spark DataFrame 专有参数

    以下参数仅适用于 Spark SQL 和 Spark DataFrame 读取方式。

    60a6bcefe26f4b118e50f46e4d0afd1d.png

    Spark RDD 专有参数

    以下参数仅适用于 Spark RDD 读取方式。

    75f0e2306cfe4b549332ab598e15c984.png







    相关实践学习
    基于Hologres轻松玩转一站式实时仓库
    本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
    Linux入门到精通
    本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
    相关文章
    |
    3月前
    |
    分布式计算 数据处理 Apache
    Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
    【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
    414 1
    |
    2月前
    |
    分布式计算 大数据 Apache
    ClickHouse与大数据生态集成:Spark & Flink 实战
    【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
    202 2
    ClickHouse与大数据生态集成:Spark & Flink 实战
    |
    2月前
    |
    SQL 流计算 关系型数据库
    基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
    阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
    558 5
    基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
    |
    5月前
    |
    消息中间件 SQL Kafka
    实时计算 Flink版产品使用问题之使用StarRocks作为Lookup Join的表是否合适
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    6月前
    |
    分布式计算 运维 Serverless
    EMR Serverless Spark服务和EMR Serverless StarRocks服务的比较
    **EMR Serverless Spark** 以其出色的稳定性、高效性能、减轻运维负担及成本优化著称,适合大规模数据处理。**EMR Serverless StarRocks** 则以高速查询、存算分离架构和灵活扩缩容见长,侧重企业级功能。两者在不同应用场景中有各自优势,选择应基于具体需求。更多详情,参考阿里云官方资源。
    |
    2月前
    |
    分布式计算 大数据 OLAP
    AnalyticDB与大数据生态集成:Spark & Flink
    【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
    87 1
    |
    4月前
    |
    存储 数据采集 OLAP
    饿了么基于Flink+Paimon+StarRocks的实时湖仓探索
    饿了么的实时数仓经历了多个阶段的演进。初期通过实时ETL、报表应用、联动及监控构建基础架构,随后形成了涵盖数据采集、加工和服务的整体数据架构。1.0版本通过日志和Binlog采集数据,但在研发效率和数据一致性方面存在问题。2.0版本通过Dataphin构建流批一体化系统,提升了数据一致性和研发效率,但仍面临新业务适应性等问题。最终,饿了么选择Paimon和StarRocks作为实时湖仓方案,显著降低了存储成本并提高了系统稳定性。未来,将进一步优化带宽瓶颈、小文件问题及权限控制,实现更多场景的应用。
    486 7
    饿了么基于Flink+Paimon+StarRocks的实时湖仓探索
    |
    6月前
    |
    分布式计算 数据处理 流计算
    实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    6月前
    |
    关系型数据库 MySQL 数据处理
    实时计算 Flink版产品使用问题之任务无法实时同步MySQL到StarRocks中修改的数据,是什么原因
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    6月前
    |
    SQL 存储 资源调度
    实时计算 Flink版产品使用问题之如何对starrocks进行分桶
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。