阿里云 E-MapReduce ClickHouse 操作指南 04 期 — 数据导入

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 阿里云 E-MapReduce(简称 EMR )是运行在阿里云平台上的一种大数据处理的系统解决方案。ClickHouse 作为开源的列式存储数据库,主要用于在线分析处理查询(OLAP),能够使用 SQL 查询实时生成分析数据报告。而阿里云 EMR ClickHouse 则提供了开源 OLAP 分析引擎 ClickHouse 的云上托管服务。

阿里云 E-MapReduce(简称EMR)是运行在阿里云平台上的一种大数据处理的系统解决方案。ClickHouse 作为开源的列式存储数据库,主要用于在线分析处理查询(OLAP),能够使用 SQL 查询实时生成分析数据报告。而阿里云 EMR ClickHouse 则提供了开源 OLAP 分析引擎 ClickHouse 的云上托管服务。


本系列文章将从以下几个方面详细介绍 EMR ClickHouse 的操作指南:


EMR ClickHouse 操作指南 —数据导入

lALPD4PvON_W-SnNAm7NA-0_1005_622.png

一、从 Spark 导入数据至 ClickHouse

——介绍如何将 Spark 中的数据导入至 ClickHouse 集群

前提条件

  • 开发工具
  • 本地安装了 Java JDK 8。
  • 本地安装了 Maven 3.x。
  • 本地安装了用于 Java 或 Scala 开发的 IDE,推荐 IntelliJ IDEA,且已配置完成 JDK 和 Maven 环境。


背景信息

关于 Spark 的更多介绍,请参见简介

代码示例

代码示例如下。

package com.company.packageName
import java.util.Properties
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
import com.google.common.collect.ImmutableMap
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SaveMode, SparkSession}
case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
object CKDataImporter extends Logging {
  private var dbName: String = "default"
  private var tableName: String = ""
  private var ckHost: String = ""
  private var ckPort: String = "8123"
  private var user: String = "default"
  private var password: String = ""
  private var local: Boolean = false
  def main(args: Array[String]): Unit = {
    parse(args.toList)
    checkArguments()
    val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
    logInfo(s"Use jdbc: $jdbcUrl")
    logInfo(s"Use table: $tableName")
    val spark = getSparkSession
    // generate test data
    val rdd = spark.sparkContext.parallelize(1 to 1000).map(i => {
      val rand = ThreadLocalRandom.current()
      val randString = (0 until rand.nextInt(10, 20))
        .map(_ => rand.nextLong())
        .mkString("")
      Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
    })
    val df = spark.createDataFrame(rdd)
    df.write
      .mode(SaveMode.Append)
      .jdbc(jdbcUrl, tableName, getCKJdbcProperties(user, password))
  }
  private def printUsageAndExit(exitCode: Int = 0): Unit = {
    logError("Usage: java -jar /path/to/CKDataImporter.jar [options]")
    logError("  --dbName      设置ClickHouse数据库的名称,默认为default")
    logError("  --tableName   设置ClickHouse库中表的名称")
    logError("  --ckHost      设置ClickHouse地址")
    logError("  --ckPort      设置ClickHouse端口,默认为8123")
    logError("  --user        设置ClickHouse所使用的用户名")
    logError("  --password    设置ClickHouse用户的密码,默认为空")
    logError("  --local       设置此程序使用Spark Local模式运行")
    System.exit(exitCode)
  }
  @tailrec
  private def parse(args: List[String]): Unit = args match {
    case ("--help" | "-h") :: _ =>
      printUsageAndExit()
    case "--dbName" :: value :: tail =>
      dbName = value
      parse(tail)
    case "--tableName" :: value :: tail =>
      tableName = value
      parse(tail)
    case "--ckHost" :: value :: tail =>
      ckHost = value
      parse(tail)
    case "--ckPort" :: value :: tail =>
      ckPort = value
      parse(tail)
    case "--user" :: value :: tail =>
      user = value
      parse(tail)
    case "--password" :: value :: tail =>
      password = value
      parse(tail)
    case "--local" :: tail =>
      local = true
      parse(tail)
    case Nil =>
    case _ =>
      printUsageAndExit(1)
  }
  private def checkArguments(): Unit = {
    if ("".equals(tableName) || "".equals(ckHost)) {
      printUsageAndExit(2)
    }
  }
  private def getCKJdbcProperties(
      user: String,
      password: String,
      batchSize: String = "1000",
      socketTimeout: String = "300000",
      numPartitions: String = "8",
      rewriteBatchedStatements: String = "true"): Properties = {
    val kvMap = ImmutableMap.builder()
      .put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
      .put("user", user)
      .put("password", password)
      .put("batchsize", batchSize)
      .put("socket_timeout", socketTimeout)
      .put("numPartitions", numPartitions)
      .put("rewriteBatchedStatements", rewriteBatchedStatements)
      .build()
    val properties = new Properties
    properties.putAll(kvMap)
    properties
  }
  private def getSparkSession: SparkSession = {
    val builder = SparkSession.builder()
    if (local) {
      builder.master("local[*]")
    }
    builder.appName("ClickHouse-Data-Importer")
    builder.getOrCreate()
  }
}

操作流程

步骤一:创建 ClickHouse 表

  1. 使用 SSH 方式登录 ClickHouse 集群,详情请参见使用SSH连接主节点
  2. 执行如下命令,进入 ClickHouse 客户端。
clickhouse-client
  1. 创建 ClickHouse 信息。

i. 执行如下命令,创建数据库 clickhouse_database_name

CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;

阿里云 EMR 会为 ClickHouse 集群自动生成一个名为 cluster_emr 的集群。数据库名您可以自定义。


ii. 执行如下命令,创建表 clickhouse_table_name_local

CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr (
  id            UInt32,
  key1            String,
  value1        UInt8,
  key2            Int64,
  value2        Float64
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}')
ORDER BY id


说明:表名您可以自定义,但请确保表名是以 _local 结尾。layer、shard 和 replica 是阿里云 EMR 为 ClickHouse 集群自动生成的宏定义,可以直接使用。


 iii. 执行如下命令,创建与表 clickhouse_table_name_local 字段定义一致的表 clickhouse_table_name_all

说明: 表名您可以自定义,但请确保表名是以 _all 结尾。

CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr (
  id                    UInt32,
  key1                  String,
  value1                UInt8,
  key2                  Int64,
  value2                Float64
) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());

步骤二:编译并打包

  1. 下载并解压CKDataImporter示例到本地。
  2. 在 CMD 命令行中,进入到下载文件中 pom.xml 所在的目录下,执行如下命令打包文件。
mvn clean package

根据您pom.xml文件中artifactId的信息,下载文件中的target目录下会出现CKDataImporter-1.0.0.jar的JAR包。


步骤三:提交作业

  1. 使用 SSH 方式登录 Spark 集群,详情请参见使用SSH连接主节点
  2. 执行如下命令提交作业。
spark-submit --master yarn \
             --class com.aliyun.emr.CKDataImporter \
             /CKDataImporter-1.0.0.jar \
             --dbName clickhouse_database_name \
             --tableName clickhouse_table_name_all \
             --ckHost ${clickhouse_host}
参数 说明
dbName ClickHouse集群数据库的名称,默认为default。本文示例为clickhouse_database_name
tableName ClickHouse集群数据库中表的名称。本文示例为clickhouse_table_name_all
ckHost ClickHouse集群的Master节点的内网IP地址或公网IP地址。ip地址获取方式,请参见获取主节点的IP地址

获取主节点的 IP 地址

  1. 进入详情页面。
  1. 登录阿里云E-MapReduce控制台
  2. 在顶部菜单栏处,根据实际情况选择地域和资源组
  3. 单击上方的集群管理页签。
  4. 集群管理页面,单击相应集群所在行的详情
  1. 集群基础信息页面的主机信息区域,获取主节点的内网或公网IP地址。

image.png

二、从 Flink 导入数据至 ClickHouse

——介绍如何将 Flink 中的数据导入至 ClickHouse 集群

前提条件

  • 开发工具
  • 本地安装了 Java JDK 8。
  • 本地安装了 Maven 3.x。
  • 本地安装了用于 Java 或 Scala 开发的 IDE,推荐 IntelliJ IDEA,且已配置完成 JDK 和 Maven 环境。


背景信息

关于 Flink 的更多介绍,请参见Apache Flink

代码示例

代码示例如下。

  • 流处理
package com.company.packageName
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala.{StreamTableEnvironment, table2RowDataStream}
object StreamingJob {
  case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
  private var dbName: String = "default"
  private var tableName: String = ""
  private var ckHost: String = ""
  private var ckPort: String = "8123"
  private var user: String = "default"
  private var password: String = ""
  def main(args: Array[String]) {
    parse(args.toList)
    checkArguments()
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)
    val insertIntoCkSql =
      s"""
        | INSERT INTO $tableName (
        |   id, key1, value1, key2, value2
        | ) VALUES (
        |   ?, ?, ?, ?, ?
        | )
        |""".stripMargin
    val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
    println(s"jdbc url: $jdbcUrl")
    println(s"insert sql: $insertIntoCkSql")
    val sink = JDBCAppendTableSink
      .builder()
      .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
      .setDBUrl(jdbcUrl)
      .setUsername(user)
      .setPassword(password)
      .setQuery(insertIntoCkSql)
      .setBatchSize(1000)
      .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE)
      .build()
    val data: DataStream[Test] = env.fromCollection(1 to 1000).map(i => {
      val rand = ThreadLocalRandom.current()
      val randString = (0 until rand.nextInt(10, 20))
        .map(_ => rand.nextLong())
        .mkString("")
      Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
    })
    val table = table2RowDataStream(tableEnv.fromDataStream(data))
    sink.emitDataStream(table.javaStream)
    // execute program
    env.execute("Flink Streaming Scala API Skeleton")
  }
  private def printUsageAndExit(exitCode: Int = 0): Unit = {
    println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]")
    println("  --dbName      设置ClickHouse数据库的名称,默认为default")
    println("  --tableName   设置ClickHouse库中表的名称")
    println("  --ckHost      设置ClickHouse地址")
    println("  --ckPort      设置ClickHouse端口,默认为8123")
    println("  --user        设置ClickHouse所使用的用户名")
    println("  --password    设置ClickHouse用户的密码,默认为空")
    System.exit(exitCode)
  }
  @tailrec
  private def parse(args: List[String]): Unit = args match {
    case ("--help" | "-h") :: _ =>
      printUsageAndExit()
    case "--dbName" :: value :: tail =>
      dbName = value
      parse(tail)
    case "--tableName" :: value :: tail =>
      tableName = value
      parse(tail)
    case "--ckHost" :: value :: tail =>
      ckHost = value
      parse(tail)
    case "--ckPort" :: value :: tail =>
      ckPort = value
      parse(tail)
    case "--user" :: value :: tail =>
      user = value
      parse(tail)
    case "--password" :: value :: tail =>
      password = value
      parse(tail)
    case Nil =>
    case _ =>
      printUsageAndExit(1)
  }
  private def checkArguments(): Unit = {
    if ("".equals(tableName) || "".equals(ckHost)) {
      printUsageAndExit(2)
    }
  }
}
  • 批处理
package com.company.packageName
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
import org.apache.flink.Utils
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.{BatchTableEnvironment, table2RowDataSet}
object BatchJob {
  case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
  private var dbName: String = "default"
  private var tableName: String = ""
  private var ckHost: String = ""
  private var ckPort: String = "8123"
  private var user: String = "default"
  private var password: String = ""
  def main(args: Array[String]) {
    parse(args.toList)
    checkArguments()
    // set up the batch execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = BatchTableEnvironment.create(env)
    val insertIntoCkSql =
      s"""
        | INSERT INTO $tableName (
        |   id, key1, value1, key2, value2
        | ) VALUES (
        |   ?, ?, ?, ?, ?
        | )
        |""".stripMargin
    val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
    println(s"jdbc url: $jdbcUrl")
    println(s"insert sql: $insertIntoCkSql")
    val sink = JDBCAppendTableSink
      .builder()
      .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
      .setDBUrl(jdbcUrl)
      .setUsername(user)
      .setPassword(password)
      .setQuery(insertIntoCkSql)
      .setBatchSize(1000)
      .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE)
      .build()
    val data = env.fromCollection(1 to 1000).map(i => {
      val rand = ThreadLocalRandom.current()
      val randString = (0 until rand.nextInt(10, 20))
        .map(_ => rand.nextLong())
        .mkString("")
      Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
    })
    val table = table2RowDataSet(tableEnv.fromDataSet(data))
    sink.emitDataSet(Utils.convertScalaDatasetToJavaDataset(table))
    // execute program
    env.execute("Flink Batch Scala API Skeleton")
  }
  private def printUsageAndExit(exitCode: Int = 0): Unit = {
    println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]")
    println("  --dbName      设置ClickHouse数据库的名称,默认为default")
    println("  --tableName   设置ClickHouse库中表的名称")
    println("  --ckHost      设置ClickHouse地址")
    println("  --ckPort      设置ClickHouse端口,默认为8123")
    println("  --user        设置ClickHouse所使用的用户名")
    println("  --password    设置ClickHouse用户的密码,默认为空")
    System.exit(exitCode)
  }
  @tailrec
  private def parse(args: List[String]): Unit = args match {
    case ("--help" | "-h") :: _ =>
      printUsageAndExit()
    case "--dbName" :: value :: tail =>
      dbName = value
      parse(tail)
    case "--tableName" :: value :: tail =>
      tableName = value
      parse(tail)
    case "--ckHost" :: value :: tail =>
      ckHost = value
      parse(tail)
    case "--ckPort" :: value :: tail =>
      ckPort = value
      parse(tail)
    case "--user" :: value :: tail =>
      user = value
      parse(tail)
    case "--password" :: value :: tail =>
      password = value
      parse(tail)
    case Nil =>
    case _ =>
      printUsageAndExit(1)
  }
  private def checkArguments(): Unit = {
    if ("".equals(tableName) || "".equals(ckHost)) {
      printUsageAndExit(2)
    }
  }
}

操作流程

步骤一:创建 ClickHouse 表

  1. 使用 SSH 方式登录 ClickHouse 集群,详情请参见使用SSH连接主节点
  2. 执行如下命令,进入 ClickHouse 客户端。
clickhouse-client
  1. 创建 ClickHouse 信息。

i. 执行如下命令,创建数据库 clickhouse_database_name

CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;

阿里云 EMR 会为 ClickHouse 集群自动生成一个名为 cluster_emr 的集群。数据库名您可以自定义。


ii. 执行如下命令,创建表 clickhouse_table_name_local

CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr (
  id            UInt32,
  key1            String,
  value1        UInt8,
  key2            Int64,
  value2        Float64
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}')
ORDER BY id


说明:表名您可以自定义,但请确保表名是以_local结尾。layer、shard 和 replica 是阿里云 EMR 为 ClickHouse 集群自动生成的宏定义,可以直接使用。


 iii. 执行如下命令,创建与表 clickhouse_table_name_local 字段定义一致的表clickhouse_table_name_all

说明: 表名您可以自定义,但请确保表名是以_all结尾。

CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr (
  id                    UInt32,
  key1                  String,
  value1                UInt8,
  key2                  Int64,
  value2                Float64
) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());

步骤二:编译并打包

  1. 下载并解压flink-clickhouse-demo.tgz示例到本地。
  2. 在 CMD 命令行中,进入到下载文件中 pom.xml 所在的目录下,执行如下命令打包文件。
mvn clean package

根据您 pom.xml 文件中 artifactId 的信息,下载文件中的 target 目录下会出现 flink-clickhouse-demo-1.0.0.jar 的JAR包。


步骤三:提交作业

  1. 使用 SSH 方式登录 Flink 集群,详情请参见使用SSH连接主节点
  2. 执行如下命令提交作业。代码示例如下:
  • 流作业
flink run -m yarn-cluster \
          -c com.aliyun.emr.StreamingJob \
          flink-clickhouse-demo-1.0.0.jar \
          --dbName clickhouse_database_name \
          --tableName clickhouse_table_name_all \
          --ckHost ${clickhouse_host}
  • 批作业
flink run -m yarn-cluster \
          -c com.aliyun.emr.BatchJob \
          flink-clickhouse-demo-1.0.0.jar \
          --dbName clickhouse_database_name \
          --tableName clickhouse_table_name_all \
          --ckHost ${clickhouse_host}
参数 说明
dbName ClickHouse集群数据库的名称,默认为default。本文示例为clickhouse_database_name
tableName ClickHouse集群数据库中表的名称。本文示例为clickhouse_table_name_all
ckHost ClickHouse集群的Master节点的内网IP地址或公网IP地址。ip地址获取方式,请参见获取主节点的IP地址

获取主节点的 IP 地址

  1. 进入详情页面。
  1. 登录阿里云E-MapReduce控制台
  2. 在顶部菜单栏处,根据实际情况选择地域和资源组
  3. 单击上方的集群管理页签。
  4. 集群管理页面,单击相应集群所在行的详情
  1. 集群基础信息页面的主机信息区域,获取主节点的内网或公网IP地址。

image.png


三、从 HDFS 导入数据至 ClickHouse

——可以通过 HDFS 表引擎或表函数读写数据

前提条件


使用 HDFS 表引擎读写数据

语法

CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
  name1 [type1],
  name2 [type2],
  ...
)
Engine = HDFS(uri, format)

其中,涉及参数描述如下表所示。

参数 描述
db 数据库名。
table_name 表名。
name1/name2 列名。
tyep1/type2 列的类型。
uri HDFS上文件的地址。


说明

  • 不可以是目录地址。
  • 文件所属的目录需要存在,如果不存在,则写数据时会报错。
format 文件的类型。

示例

  1. 创建业务表和 HDFS 表

i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群

ii. 执行如下命令,进入 ClickHouse 客户端。

clickhouse-client

iii. 执行以下命令,创建数据库 hdfs。

CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr;

iv. 执行以下命令,创建表 orders。

CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');


说明 本文示例是将示例数据上传到了HDFS集群的根目录下。代码中的192.168.**.**为HDFS集群的emr-header-1节点的内网IP地址,您可以在EMR控制台集群管理页签中的主机列表页面查看。

v.  执行以下命令,创建数据库 product。

CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;

vi. 执行以下命令,创建业务表 orders。

CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);

vii. 执行以下命令,创建业务表 orders_all。

CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());


  1. 使用 HDFS 表引擎导入数据

i. 下载并上传示例数据orders.csv至 HDFS 集群的目录下。
说明 本文示例上传到了 HDFS 集群的根目录下。

ii. 执行以下命令,导入数据。

INSERT INTO product.orders_all
SELECT
  uid,
  date,
  skuId,
  order_revenue
FROM
  hdfs.orders;

iii. 执行以下命令,检查数据一致性。

SELECT
  a.*
FROM
  hdfs.orders a
LEFT ANTI JOIN
  product.orders_all
USING uid;
  1. 使用 HDFS 表引擎导出数据

i. 执行以下命令,构造数据。

INSERT INTO product.orders_all VALUES \
  (60333391,'2021-08-04 11:26:01',49358700,89) \
  (38826285,'2021-08-03 10:47:29',25166907,27) \
  (10793515,'2021-07-31 02:10:31',95584454,68) \
  (70246093,'2021-08-01 00:00:08',82355887,97) \
  (70149691,'2021-08-02 12:35:45',68748652,1)  \
  (87307646,'2021-08-03 19:45:23',16898681,71) \
  (61694574,'2021-08-04 23:23:32',79494853,35) \
  (61337789,'2021-08-02 07:10:42',23792355,55) \
  (66879038,'2021-08-01 16:13:19',95820038,89);

ii. 执行以下命令,导出数据。

INSERT INTO hdfs.orders
SELECT
  uid,
  date,
  skuId,
  order_revenue
FROM
 product.orders_all;

iii. 执行以下命令,可以检查数据一致性。

SELECT
  a.*
FROM
  hdfs.orders
RIGHT ANTI JOIN
  product.orders_all a
USING uid;


使用 HDFS 表函数读写数据

语法

hdfs(uri, format, structure);

其中,涉及参数描述如下表所示。

参数 描述
uri HDFS上文件的地址。

说明

  • 不可以是目录地址。
  • 文件所属的目录需要存在,如果不存在,则写数据时会报错。
format 文件的类型。
structure 表中字段的类型。例如,column1 UInt32,column2 String。

示例

  1. 创建数据库和业务表

i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群

ii. 执行如下命令,进入 ClickHouse 客户端。

clickhouse-client

iii. 执行以下命令,创建数据库 product。

CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;

iv. 执行以下命令,创建业务表 orders。

CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);

v. 执行以下命令,创建业务表 orders_all。

CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());
  1. 使用 HDFS 表引擎导入数据

i. 下载并上传示例数据orders.csv至HDFS集群的目录下。

说明 本文示例上传到了 HDFS 集群的根目录下。

ii. 执行以下命令,导入数据。

INSERT INTO product.orders_all
SELECT
  uid,
  date,
  skuId,
  order_revenue
FROM
  hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');

iii. 执行以下命令,可以检查数据一致性。

SELECT
  a.*
FROM
  hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32') a
LEFT ANTI JOIN
  product.orders_all
USING uid;
  1. 使用 HDFS 表引擎导出数据

i. 执行以下命令,构造数据。

INSERT INTO product.orders_all VALUES \
  (60333391,'2021-08-04 11:26:01',49358700,89) \
  (38826285,'2021-08-03 10:47:29',25166907,27) \
  (10793515,'2021-07-31 02:10:31',95584454,68) \
  (70246093,'2021-08-01 00:00:08',82355887,97) \
  (70149691,'2021-08-02 12:35:45',68748652,1)  \
  (87307646,'2021-08-03 19:45:23',16898681,71) \
  (61694574,'2021-08-04 23:23:32',79494853,35) \
  (61337789,'2021-08-02 07:10:42',23792355,55) \
  (66879038,'2021-08-01 16:13:19',95820038,89);

ii. 执行以下命令,导出数据。

INSERT INTO FUNCTION
  hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
SELECT
  uid,
  date,
  skuId,
  order_revenue
FROM
  product.orders_all;

iii. 执行以下命令,可以检查数据一致性。

SELECT
  a.*
FROM
  hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
RIGHT ANTI JOIN
  product.orders_all a

配置

EMR ClickHouse 允许使用对 HDFS 进行配置:

  • 全局生效的 HDFS 配置。
<hdfs>
  <dfs_default_replica>3</dfs_default_replica>
</hdfs>


说明 查询参数时将下划线(_)替换为半角句号(.)即可。例如,您要查询EMR中的参数dfs_default_replica,则可以在官网文档中搜索dfs.default.replica


  • 仅对${user}用户生效的 HDFS 配置,用户配置与全局配置相同的键不同值时,会覆盖全局配置
<hdfs_${user}>
  <dfs_default_replica>3</dfs_default_replica>
</hdfs_${user}>

四、从 RDS 导入数据至 ClickHouse 集群

——可以通过 RDS MySQL 表引擎或表函数导入数据至 ClickHouse 集群

前提条件


使用 RDS MySQL 表引擎导入数据

语法

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);

其中,涉及参数描述如下表所示。

参数 描述
db 数据库名。
table_name 表名。
cluster 集群标识。
name1/name2 列名。
tyep1/type2 列的类型。
host:port RDS MySQL的地址,可以在RDS MySQL管理控制台中数据库连接中进行查看。
database RDS MySQL中的数据库名。
table RDS MySQL中的表名。
user 用户名,该用户具有访问上述RDS MySQL中库中的表的权限。
password user对应的密码。
replace_query 是否将INSERT INTO查询转换为REPLACE INTO的标志。设置为1,表示替换查询。
on_duplicate_clause 会被添加到INSERT语句中。例如,INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1,此时需要指定on_duplicate_clauseUPDATE c2 = c2 + 1

示例

  1. 在 RDS MySQL 实例中,创建原始数据表并导入原始数据。

i. 连接 MySQL 实例,详情请参见通过客户端、命令行连接RDS MySQL

ii. 执行以下命令,创建原始数据表。

CREATE TABLE `origin`.`orders` (
  `uid` int(10) unsigned DEFAULT NULL,
  `date` datetime DEFAULT NULL,
  `skuId` int(10) unsigned DEFAULT NULL,
  `order_revenue` int(10) unsigned DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

iii. 执行以下命令,导入原始数据。

INSERT INTO `origin`.`orders` VALUES(60333391, '2021-08-04 11:26:01', 49358700, 89),
       (38826285, '2021-08-03 10:47:29', 25166907, 27),
       (10793515, '2021-07-31 02:10:31', 95584454, 68),
       (70246093, '2021-08-01 00:00:08', 82355887, 97),
       (70149691, '2021-08-02 12:35:45', 68748652, 1),
       (87307646, '2021-08-03 19:45:23', 16898681, 71),
       (61694574, '2021-08-04 23:23:32', 79494853, 35),
       (61337789, '2021-08-02 07:10:42', 23792355, 55),
       (66879038, '2021-08-01 16:13:19', 95820038, 89);
  1. 在 ClickHouse 集群中,执行以下操作。

i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群

ii. 执行以下命令,进入 ClickHouse 客户端。

clickhouse-client

iii. 执行以下命令,创建数据库 mysql。

CREATE DATABASE IF NOT EXISTS mysql;

iv. 执行以下命令,创建表 orders。

CREATE TABLE mysql.orders
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
ENGINE = MySQL('host:port', 'origin', 'orders', 'user', 'password');

v. 执行以下命令,创建数据库 product。

CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;

vi. 执行以下命令,创建业务表 orders。

CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);

vii. 执行以下命令,创建业务表orders_all。

CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());

viii. 执行以下命令,导入数据。

INSERT INTO product.orders_all
SELECT
  uid,
  date,
  skuId,
  order_revenue
FROM
  mysql.orders;

ix. 执行以下命令,查询数据。

SELECT a.*
FROM mysql.orders AS a
ANTI LEFT JOIN product.orders_all USING (uid);


说明 查询数据为空时正常。


使用 RDS MySQL 表函数导入数据

语法

mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause'])

其中,涉及参数描述如下表所示。

参数 描述
host:port RDS MySQL的地址,您可以在RDS MySQL管理控制台中的数据库连接中查看。
database RDS MySQL中的数据库名。
table RDS MySQL中的表名。
user 用户名,该用户具有访问上述RDS MySQL中库中的表的权限。
password user对应的密码。
replace_query 是否将INSERT INTO查询转换为REPLACE INTO的标志。设置为1,表示替换查询。
on_duplicate_clause 会被添加到INSERT语句中。例如,INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1,此时需要指定on_duplicate_clauseUPDATE c2 = c2 + 1

示例

  1. 在 RDS MySQL 实例中,创建表并插入数据。

i. 连接 MySQL 实例,详情请参见通过客户端、命令行连接RDS MySQL

ii. 执行以下命令,创建表 orders。

CREATE TABLE `origin`.`orders` (
  `uid` int(10) unsigned DEFAULT NULL,
  `date` datetime DEFAULT NULL,
  `skuId` int(10) unsigned DEFAULT NULL,
  `order_revenue` int(10) unsigned DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

iii. 执行以下命令,插入数据。

INSERT INTO `origin`.`orders` VALUES(60333391, '2021-08-04 11:26:01', 49358700, 89),
       (38826285, '2021-08-03 10:47:29', 25166907, 27),
       (10793515, '2021-07-31 02:10:31', 95584454, 68),
       (70246093, '2021-08-01 00:00:08', 82355887, 97),
       (70149691, '2021-08-02 12:35:45', 68748652, 1),
       (87307646, '2021-08-03 19:45:23', 16898681, 71),
       (61694574, '2021-08-04 23:23:32', 79494853, 35),
       (61337789, '2021-08-02 07:10:42', 23792355, 55),
       (66879038, '2021-08-01 16:13:19', 95820038, 89) ;
  1. 在 ClickHouse 集群中,执行以下操作。

i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群

ii. 执行如下命令,进入 ClickHouse 客户端。

clickhouse-client

iii. 执行以下命令,创建数据库 product。

CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;

iv.执行以下命令,创建表 orders。

CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);

v. 执行以下命令,创建表 orders_all。

CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());

vi. 执行以下命令,导入数据。

INSERT INTO product.orders_all
SELECT
  uid,
  date,
  skuId,
  order_revenue
FROM
  mysql('host:port', 'origin', 'orders', 'user', 'password');

vii. 执行以下命令,查询数据。

SELECT a.*
FROM
  mysql('host:port', 'origin', 'orders', 'user', 'password') AS a
ANTI LEFT JOIN product.orders_all USING (uid);


说明 查询数据为空时正常。

如果您需要导出数据,则将业务表数据写入 MySQL 表函数即可。写入命令如下。

INSERT INTO FUNCTION
  mysql('host:port', 'origin', 'orders', 'user', 'password')
FROM
  product.orders_all;

五、从 Kafka 导入数据至 ClickHouse

——可以通过 Kafka 表引擎导入数据至 ClickHouse 集群

前提条件

使用限制

Kafka集群和ClickHouse集群需要在同一VPC下。

语法

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host1:port1,host2:port2',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format';

其中,涉及参数描述如下表所示。

参数 描述
db 数据库名。
table_name 表名。
cluster 集群标识。
name1/name2 列名。
tyep1/type2 列的类型。
kafka_broker_list Kafka Broker的地址及端口。

Kafka集群所有节点的内网IP地址及端口,您可以在EMR控制台集群管理页签中的主机列表页面查看。

kafka_topic_list 订阅的Topic名称。
kafka_group_name Kafka consumer的分组名称。
kafka_format 数据的类型。例如,CSV和JSONEachRow等,详细信息请参见Formats for Input and Output Data

示例

  1. 在 ClickHouse 集群中执行以下操作。

i. 使用SSH方式登录 ClickHouse 集群,详情请参见登录集群

ii. 执行如下命令,进入 ClickHouse 客户端。

clickhouse-client

iii. 执行如下命令,创建数据库 kafka。

CREATE DATABASE IF NOT EXISTS kafka ON CLUSTER cluster_emr;


说明 数据库名您可以自定义。本文示例中的cluster_emr是集群默认的标识,如果您修改过,请填写正确的集群标识,您也可以在EMR控制台ClickHouse服务的配置页面,在搜索区域搜索clickhouse_remote_servers参数查看。

iv. 执行如下命令,创建 Kafka 表。

CREATE TABLE IF NOT EXISTS kafka.consumer ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
ENGINE = Kafka()
SETTINGS
  kafka_broker_list = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
  kafka_topic_list = 'clickhouse_test',
  kafka_group_name = 'clickhouse_test',
  kafka_format = 'CSV';

kafka_broker_list为 Kafka 集群所有节点的内网 IP 地址及端口,您可以在 EMR 控制台集群管理页签中的主机列表页面查看。其余参数含义请参见语法。

image.png

v. 执行如下命令,创建数据库 product。

CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;

vi. 执行以下命令,创建本地表。

CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);

vii. 执行以下命令,创建分布式表。

CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());

viii. 执行以下命令,创建 MATERIALIZED VIEW 自动导数据。

CREATE MATERIALIZED VIEW IF NOT EXISTS product.kafka_load ON CLUSTER cluster_emr TO product.orders AS
SELECT *
FROM kafka.consumer;
  1. 在 Kafka 集群中执行以下操作。

i. 使用 SSH 方式登录 Kafka 集群,详情请参见登录集群

ii. 在 Kafka 集群的命令行窗口,执行如下命令运行 Kafka 的生产者。

/usr/lib/kafka-current/bin/kafka-console-producer.sh --broker-list 192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092 --topic clickhouse_test

iii. 执行以下命令,输入测试数据。

38826285,2021-08-03 10:47:29,25166907,27
10793515,2021-07-31 02:10:31,95584454,68
70246093,2021-08-01 00:00:08,82355887,97
70149691,2021-08-02 12:35:45,68748652,1
87307646,2021-08-03 19:45:23,16898681,71
61694574,2021-08-04 23:23:32,79494853,35
61337789,2021-08-02 07:10:42,23792355,55
66879038,2021-08-01 16:13:19,95820038,89

image.png

  1. 在 ClickHouse 命令窗口中,执行以下命令,可以查看从 Kafka 中导入至 ClickHouse 集群的数据。

您可以校验查询到的数据与源数据是否一致。

SELECT * FROM product.orders_all;

image.png


后续

您已经学习了数据导入,本系列还包括其他内容:




获取更详细的信息,点击下方链接查看:

EMR官网:https://www.aliyun.com/product/emapreduce

EMR ClickHouse :https://help.aliyun.com/document_detail/212195.html


扫描下方二维码加入 EMR 相关产品钉钉交流群一起参与讨论吧!

lALPD26eQMAeAf_NAd7NAvs_763_478.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
1月前
|
存储 数据采集 监控
阿里云DTS踩坑经验分享系列|SLS同步至ClickHouse集群
作为强大的日志服务引擎,SLS 积累了用户海量的数据。为了实现数据的自由流通,DTS 开发了以 SLS 为源的数据同步插件。目前,该插件已经支持将数据从 SLS 同步到 ClickHouse。通过这条高效的同步链路,客户不仅能够利用 SLS 卓越的数据采集和处理能力,还能够充分发挥 ClickHouse 在数据分析和查询性能方面的优势,帮助企业显著提高数据查询速度,同时有效降低存储成本,从而在数据驱动决策和资源优化配置上取得更大成效。
129 9
|
3月前
|
存储 分布式计算 数据库
阿里云国际版设置数据库云分析工作负载的 ClickHouse 版
阿里云国际版设置数据库云分析工作负载的 ClickHouse 版
|
6月前
|
存储 大数据 关系型数据库
从 ClickHouse 到阿里云数据库 SelectDB 内核 Apache Doris:快成物流的数智化货运应用实践
目前已经部署在 2 套生产集群,存储数据总量达百亿规模,覆盖实时数仓、BI 多维分析、用户画像、货运轨迹信息系统等业务场景。
|
7月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之如何对ClickHouse进行操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
存储 Cloud Native 大数据
国内独家|阿里云瑶池发布ClickHouse企业版:云原生Serverless新体验
全面升级为云原生架构,支持云原生按需弹性Serverless能力,解决了长期困扰用户的集群扩展效率和平滑性问题。
国内独家|阿里云瑶池发布ClickHouse企业版:云原生Serverless新体验
|
8月前
|
存储 容灾 Cloud Native
阿里云ClickHouse企业版正式商业化,为开发者提供容灾性更好、性价比更高的实时数仓
2024年4月23日,阿里云联合 ClickHouse Inc. 成功举办了企业版商业化发布会。阿里云 ClickHouse 企业版是阿里云和 ClickHouse 原厂 ClickHouse. Inc 独家合作的存算分离的云原生版本,支持资源按需弹性 Serverless,帮助企业降低成本的同时,为企业带来更多商业价值。
557 1
|
8月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之sql读取mysql写入clickhouse,该如何操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
存储 缓存 运维
阿里云数据库 ClickHouse 云原生版产品解析
ClickHouse 介绍ClickHouse 是一款当前非常流行的开源在线分析型数据库。ClickHouse 主要应用于实时数仓构建、大数据加速分析、宽表日志分析等通用场景,服务于流量漏斗分析,用户行为分析,人群圈选,用户画像,广告投放人群评估、ABTest 、大促分析,CDP/DMP 等业务场景...
222 0
|
3月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
129 3
|
7月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
69 1

相关产品

  • 开源大数据平台 E-MapReduce