Spark读写OSS并使用OSS Select来加速查询

本文涉及的产品
对象存储 OSS,20GB 3个月
云备份 Cloud Backup,100GB 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: Spark读写OSS 基于这篇文章搭建的CDH6以及配置,我们来使Spark能够读写OSS(其他版本的Spark都是类似的做法,不再赘述)。 由于默认Spark并没有将OSS的支持包放到它的CLASSPATH里面,所以我们需要执行如下命令下面的步骤需要在所有的CDH节点执行 进入到$CDH_HO.

Spark读写OSS

基于这篇文章搭建的CDH6以及配置,我们来使Spark能够读写OSS(其他版本的Spark都是类似的做法,不再赘述)。

由于默认Spark并没有将OSS的支持包放到它的CLASSPATH里面,所以我们需要执行如下命令

下面的步骤需要在所有的CDH节点执行

进入到$CDH_HOME/lib/spark目录, 执行如下命令

[root@cdh-master spark]# cd jars/
[root@cdh-master jars]# ln -s ../../../jars/hadoop-aliyun-3.0.0-cdh6.0.1.jar hadoop-aliyun.jar
[root@cdh-master jars]# ln -s ../../../jars/aliyun-sdk-oss-2.8.3.jar aliyun-sdk-oss-2.8.3.jar
[root@cdh-master jars]# ln -s ../../../jars/jdom-1.1.jar jdom-1.1.jar

进入到$CDH_HOME/lib/spark目录,运行一个查询

[root@cdh-master spark]# ./bin/spark-shell
WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark).
WARNING: Running spark-class from user-defined location.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://x.x.x.x:4040
Spark context available as 'sc' (master = yarn, app id = application_1540878848110_0004).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0-cdh6.0.1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val myfile = sc.textFile("oss://{your-bucket-name}/50/store_sales")
myfile: org.apache.spark.rdd.RDD[String] = oss://{
   your-bucket-name}/50/store_sales MapPartitionsRDD[1] at textFile at <console>:24

scala> myfile.count()
res0: Long = 144004764

scala> myfile.map(line => line.split('|')).filter(_(0).toInt >= 2451262).take(3)
res15: Array[Array[String]] = Array(Array(2451262, 71079, 20359, 154660, 284233, 6206, 150579, 46, 512, 2160001, 84, 6.94, 11.38, 9.33, 681.83, 783.72, 582.96, 955.92, 5.09, 681.83, 101.89, 106.98, -481.07), Array(2451262, 71079, 26863, 154660, 284233, 6206, 150579, 46, 345, 2160001, 12, 67.82, 115.29, 25.36, 0.00, 304.32, 813.84, 1383.48, 21.30, 0.00, 304.32, 325.62, -509.52), Array(2451262, 71079, 55852, 154660, 284233, 6206, 150579, 46, 243, 2160001, 74, 32.41, 34.67, 1.38, 0.00, 102.12, 2398.34, 2565.58, 4.08, 0.00, 102.12, 106.20, -2296.22))

scala> myfile.map(line => line.split('|')).filter(_(0) >= "2451262").saveAsTextFile("oss://{your-bucket-name}/spark-oss-test.1")

Spark支持OSS Select

这篇文章介绍了OSS Select,OSS Select目前已经在深圳区域实现商业化,下面的实验将基于oss-cn-shenzhen.aliyuncs.com这个OSS EndPoint来进行(基于CDH6,其他版本的Spark做法类似)。

部署

下面的步骤需要在所有的CDH节点执行

下载OSS Select的Spark支持包(目前该支持包还在测试中),放到$CDH_HOME/jars下

http://gosspublic.alicdn.com/hadoop-spark/spark-2.2.0-oss-select-0.1.0-SNAPSHOT.tar.gz

[root@cdh-master jars]# pwd
/opt/cloudera/parcels/CDH/jars
[root@cdh-master jars]# ls -ltrh
.....
-rw-r--r-- 1 root root   20K 9月  20 03:45 jsr305-3.0.1.jar
-rw-r--r-- 1 root root  114K 10月 30 16:12 aliyun-java-sdk-core-3.4.0.jar
-rw-r--r-- 1 root root  770K 10月 30 16:12 aliyun-java-sdk-ecs-4.2.0.jar
-rw-r--r-- 1 root root  535K 10月 30 16:12 aliyun-sdk-oss-3.3.0.jar
-rw-r--r-- 1 root root   66K 10月 30 16:12 aliyun-oss-select-spark_2.11-0.1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root   13K 10月 30 16:12 aliyun-java-sdk-sts-3.0.0.jar
-rw-r--r-- 1 root root  211K 10月 30 16:12 aliyun-java-sdk-ram-3.0.0.jar
-rw-r--r-- 1 root root  870K 10月 30 16:13 jaxb-impl-2.2.3-1.jar
-rw-r--r-- 1 root root  150K 10月 30 16:13 jdom-1.1.jar
-rw-r--r-- 1 root root  145K 10月 30 16:13 jersey-json-1.9.jar
-rw-r--r-- 1 root root  448K 10月 30 16:13 jersey-core-1.9.jar
-rw-r--r-- 1 root root   56K 10月 30 16:13 json-20170516.jar
-rw-r--r-- 1 root root   67K 10月 30 16:13 jettison-1.1.jar
-rw-r--r-- 1 root root   26K 10月 30 16:13 stax-api-1.0.1.jar

进入到$CDH_HOME/lib/spark/jars

[root@cdh-master jars]# pwd
/opt/cloudera/parcels/CDH/lib/spark/jars
[root@cdh-master jars]# rm -f aliyun-sdk-oss-2.8.3.jar
[root@cdh-master jars]# ln -s ../../../jars/aliyun-oss-select-spark_2.11-0.1.0-SNAPSHOT.jar aliyun-oss-select-spark_2.11-0.1.0-SNAPSHOT.jar
[root@cdh-master jars]# ln -s ../../../jars/aliyun-java-sdk-core-3.4.0.jar aliyun-java-sdk-core-3.4.0.jar
[root@cdh-master jars]# ln -s ../../../jars/aliyun-java-sdk-ecs-4.2.0.jar aliyun-java-sdk-ecs-4.2.0.jar
[root@cdh-master jars]# ln -s ../../../jars/aliyun-java-sdk-ram-3.0.0.jar aliyun-java-sdk-ram-3.0.0.jar
[root@cdh-master jars]# ln -s ../../../jars/aliyun-java-sdk-sts-3.0.0.jar aliyun-java-sdk-sts-3.0.0.jar
[root@cdh-master jars]# ln -s ../../../jars/aliyun-sdk-oss-3.3.0.jar aliyun-sdk-oss-3.3.0.jar
[root@cdh-master jars]# ln -s ../../../jars/jdom-1.1.jar jdom-1.1.jar

对比测试

这里使用的是spark on yarn,其中Node Manager节点是4个,每个节点最多可以运行4个container,每个container配备的资源是1核2GB内存。
测试数据共630MB,包含3列,分别是姓名、公司和年龄。

[root@cdh-master jars]# hadoop fs -ls oss://select-test-sz/people/
Found 10 items
-rw-rw-rw-   1   63079930 2018-10-30 17:03 oss://select-test-sz/people/part-00000
-rw-rw-rw-   1   63079930 2018-10-30 17:03 oss://select-test-sz/people/part-00001
-rw-rw-rw-   1   63079930 2018-10-30 17:05 oss://select-test-sz/people/part-00002
-rw-rw-rw-   1   63079930 2018-10-30 17:05 oss://select-test-sz/people/part-00003
-rw-rw-rw-   1   63079930 2018-10-30 17:06 oss://select-test-sz/people/part-00004
-rw-rw-rw-   1   63079930 2018-10-30 17:12 oss://select-test-sz/people/part-00005
-rw-rw-rw-   1   63079930 2018-10-30 17:14 oss://select-test-sz/people/part-00006
-rw-rw-rw-   1   63079930 2018-10-30 17:14 oss://select-test-sz/people/part-00007
-rw-rw-rw-   1   63079930 2018-10-30 17:15 oss://select-test-sz/people/part-00008
-rw-rw-rw-   1   63079930 2018-10-30 17:16 oss://select-test-sz/people/part-00009

进入到$CDH_HOME/lib/spark/,启动spark-shell

[root@cdh-master spark]# ./bin/spark-shell
WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark).
WARNING: Running spark-class from user-defined location.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://x.x.x.x:4040
Spark context available as 'sc' (master = yarn, app id = application_1540887123331_0008).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0-cdh6.0.1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val sqlContext = spark.sqlContext
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4bdef487

scala> sqlContext.sql("CREATE TEMPORARY VIEW people USING com.aliyun.oss " +
     |   "OPTIONS (" +
     |   "oss.bucket 'select-test-sz', " +
     |   "oss.prefix 'people', " + // objects with this prefix belong to this table
     |   "oss.schema 'name string, company string, age long'," + // like 'column_a long, column_b string'
     |   "oss.data.format 'csv'," + // we only support csv now
     |   "oss.input.csv.header 'None'," +
     |   "oss.input.csv.recordDelimiter '\r\n'," +
     |   "oss.input.csv.fieldDelimiter ','," +
     |   "oss.input.csv.commentChar '#'," +
     |   "oss.input.csv.quoteChar '\"'," +
     |   "oss.output.csv.recordDelimiter '\n'," +
     |   "oss.output.csv.fieldDelimiter ','," +
     |   "oss.output.csv.quoteChar '\"'," +
     |   "oss.endpoint 'oss-cn-shenzhen.aliyuncs.com', " +
     |   "oss.accessKeyId 'Your Access Key Id', " +
     |   "oss.accessKeySecret 'Your Access Key Secret')")
res0: org.apache.spark.sql.DataFrame = []

scala>   val sql: String = "select count(*) from people where name like 'Lora%'"
sql: String = select count(*) from people where name like 'Lora%'

scala>   sqlContext.sql(sql).show()
+--------+
|count(1)|
+--------+
|   31770|
+--------+

scala> val textFile = sc.textFile("oss://select-test-sz/people/")
textFile: org.apache.spark.rdd.RDD[String] = oss://select-test-sz/people/ MapPartitionsRDD[8] at textFile at <console>:24

scala> textFile.map(line => line.split(',')).filter(_(0).startsWith("Lora")).count()
res3: Long = 31770

然后我们分别看使用OSS Select与不使用OSS Select的时间对比,可以看到,使用OSS Select的时间是不使用OSS Select时间的四分之一。
_2018_10_30_5_35_25

Spark对接OSS Select支持包的实现(Preview)

我们通过扩展Spark的DataSource API来实现Spark对接OSS Select。通过实现PrunedFilteredScan,我们可以把需要的列和过滤条件下推到OSS Select执行。目前这个支持包还在开发中,定义的规范如下:

scala> sqlContext.sql("CREATE TEMPORARY VIEW people USING com.aliyun.oss " +
     |   "OPTIONS (" +
     |   "oss.bucket 'select-test-sz', " +
     |   "oss.prefix 'people', " + // objects with this prefix belong to this table
     |   "oss.schema 'name string, company string, age long'," + // like 'column_a long, column_b string'
     |   "oss.data.format 'csv'," + // we only support csv now
     |   "oss.input.csv.header 'None'," +
     |   "oss.input.csv.recordDelimiter '\r\n'," +
     |   "oss.input.csv.fieldDelimiter ','," +
     |   "oss.input.csv.commentChar '#'," +
     |   "oss.input.csv.quoteChar '\"'," +
     |   "oss.output.csv.recordDelimiter '\n'," +
     |   "oss.output.csv.fieldDelimiter ','," +
     |   "oss.output.csv.quoteChar '\"'," +
     |   "oss.endpoint 'oss-cn-shenzhen.aliyuncs.com', " +
     |   "oss.accessKeyId 'Your Access Key Id', " +
     |   "oss.accessKeySecret 'Your Access Key Secret')")
字段 说明
oss.bucket 数据所在的bucket
oss.prefix 拥有这个前缀的Object都属于定义的这个TEMPORARY VIEW
oss.schema 这个 TEMPORARY VIEW的schema,目前通过字符串指定,后续会通过一个文件来指定schema
oss.data.format 数据内容的格式,目前支持CSV格式,其他格式也会陆续支持
oss.input.csv.* 定义CSV输入格式参数
oss.output.csv.* 定义CSV输出格式参数
oss.endpoint bucket所在的Endpoint
oss.accessKeyId 你的Access Key Id
oss.accessKeySecret 你的Access Key Secret

支持的过滤条件:=,<,>,<=, >=,||,or,not,and,in,like(StringStartsWith,StringEndsWith,StringContains)。对于不能下推的过滤条件(如算术运算、字符串拼接等,这些通过PrunedFilteredScan获取不到),则只下推需要的列到OSS Select。

然而,OSS Select还支持其他过滤条

对比TPC-H的查询

主要对比TPC-H中query1.sql对于lineitem这个table的查询性能,为了能使OSS Select过滤更多的数据,我们将where条件改一下(由l_shipdate <= '1998-09-16'改为where l_shipdate > '1997-09-16'),测试数据大小是2.27GB

[root@cdh-master ~]# hadoop fs -ls oss://select-test-sz/data/lineitem.csv
-rw-rw-rw-   1 2441079322 2018-10-31 11:18 oss://select-test-sz/data/lineitem.csv

对比如下

scala> import org.apache.spark.sql.types.{
   IntegerType, LongType, StringType, StructField, StructType, DoubleType}
import org.apache.spark.sql.types.{
   IntegerType, LongType, StringType, StructField, StructType, DoubleType}

scala> import org.apache.spark.sql.{
   Row, SQLContext}
import org.apache.spark.sql.{
   Row, SQLContext}

scala> val sqlContext = spark.sqlContext
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@74e2cfc5

scala> val textFile = sc.textFile("oss://select-test-sz/data/lineitem.csv")
textFile: org.apache.spark.rdd.RDD[String] = oss://select-test-sz/data/lineitem.csv MapPartitionsRDD[1] at textFile at <console>:26

scala> val dataRdd = textFile.map(_.split('|'))
dataRdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:28

scala> val schema = StructType(
     |     List(
     |         StructField("L_ORDERKEY",LongType,true),
     |         StructField("L_PARTKEY",LongType,true),
     |         StructField("L_SUPPKEY",LongType,true),
     |         StructField("L_LINENUMBER",IntegerType,true),
     |         StructField("L_QUANTITY",DoubleType,true),
     |         StructField("L_EXTENDEDPRICE",DoubleType,true),
     |         StructField("L_DISCOUNT",DoubleType,true),
     |         StructField("L_TAX",DoubleType,true),
     |         StructField("L_RETURNFLAG",StringType,true),
     |         StructField("L_LINESTATUS",StringType,true),
     |         StructField("L_SHIPDATE",StringType,true),
     |         StructField("L_COMMITDATE",StringType,true),
     |         StructField("L_RECEIPTDATE",StringType,true),
     |         StructField("L_SHIPINSTRUCT",StringType,true),
     |         StructField("L_SHIPMODE",StringType,true),
     |         StructField("L_COMMENT",StringType,true)
     |     )
     | )
schema: org.apache.spark.sql.types.StructType = StructType(StructField(L_ORDERKEY,LongType,true), StructField(L_PARTKEY,LongType,true), StructField(L_SUPPKEY,LongType,true), StructField(L_LINENUMBER,IntegerType,true), StructField(L_QUANTITY,DoubleType,true), StructField(L_EXTENDEDPRICE,DoubleType,true), StructField(L_DISCOUNT,DoubleType,true), StructField(L_TAX,DoubleType,true), StructField(L_RETURNFLAG,StringType,true), StructField(L_LINESTATUS,StringType,true), StructField(L_SHIPDATE,StringType,true), StructField(L_COMMITDATE,StringType,true), StructField(L_RECEIPTDATE,StringType,true), StructField(L_SHIPINSTRUCT,StringType,true), StructField(L_SHIPMODE,StringType,true), StructField(L_COMMENT,StringType,true))

scala> val dataRowRdd = dataRdd.map(p => Row(p(0).toLong, p(1).toLong, p(2).toLong, p(3).toInt, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8), p(9), p(10), p(11), p(12), p(13), p(14), p(15)))
dataRowRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:30

scala> val dataFrame = sqlContext.createDataFrame(dataRowRdd, schema)
dataFrame: org.apache.spark.sql.DataFrame = [L_ORDERKEY: bigint, L_PARTKEY: bigint ... 14 more fields]

scala> dataFrame.createOrReplaceTempView("lineitem")

scala> spark.sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from lineitem where l_shipdate > '1997-09-16' group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus").show()
+------------+------------+-----------+--------------------+--------------------+--------------------+------------------+------------------+-------------------+-----------+
|l_returnflag|l_linestatus|    sum_qty|      sum_base_price|      sum_disc_price|          sum_charge|           avg_qty|         avg_price|           avg_disc|count_order|
+------------+------------+-----------+--------------------+--------------------+--------------------+------------------+------------------+-------------------+-----------+
|           N|           O|7.5697385E7|1.135107538838699...|1.078345555027154...|1.121504616321447...|25.501957856643052|38241.036487881756|0.04999335309103123|    2968297|
+------------+------------+-----------+--------------------+--------------------+--------------------+------------------+------------------+-------------------+-----------+

scala> sqlContext.sql("CREATE TEMPORARY VIEW item USING com.aliyun.oss " +
     |   "OPTIONS (" +
     |   "oss.bucket 'select-test-sz', " +
     |   "oss.prefix 'data', " +
     |   "oss.schema 'L_ORDERKEY long, L_PARTKEY long, L_SUPPKEY long, L_LINENUMBER int, L_QUANTITY double, L_EXTENDEDPRICE double, L_DISCOUNT double, L_TAX double, L_RETURNFLAG string, L_LINESTATUS string, L_SHIPDATE string, L_COMMITDATE string, L_RECEIPTDATE string, L_SHIPINSTRUCT string, L_SHIPMODE string, L_COMMENT string'," +
     |   "oss.data.format 'csv'," + // we only support csv now
     |   "oss.input.csv.header 'None'," +
     |   "oss.input.csv.recordDelimiter '\n'," +
     |   "oss.input.csv.fieldDelimiter '|'," +
     |   "oss.input.csv.commentChar '#'," +
     |   "oss.input.csv.quoteChar '\"'," +
     |   "oss.output.csv.recordDelimiter '\n'," +
     |   "oss.output.csv.fieldDelimiter ','," +
     |   "oss.output.csv.commentChar '#'," +
     |   "oss.output.csv.quoteChar '\"'," +
     |   "oss.endpoint 'oss-cn-shenzhen.aliyuncs.com', " +
     |   "oss.accessKeyId 'Your Access Key Id', " +
     |   "oss.accessKeySecret 'Your Access Key Secret')")
res2: org.apache.spark.sql.DataFrame = []

scala> sqlContext.sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from item where l_shipdate > '1997-09-16' group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus").show()

scala> sqlContext.sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from item where l_shipdate > '1997-09-16' group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus").show()
+------------+------------+-----------+--------------------+--------------------+--------------------+------------------+-----------------+-------------------+-----------+
|l_returnflag|l_linestatus|    sum_qty|      sum_base_price|      sum_disc_price|          sum_charge|           avg_qty|        avg_price|           avg_disc|count_order|
+------------+------------+-----------+--------------------+--------------------+--------------------+------------------+-----------------+-------------------+-----------+
|           N|           O|7.5697385E7|1.135107538838701E11|1.078345555027154...|1.121504616321447...|25.501957856643052|38241.03648788181|0.04999335309103024|    2968297|
+------------+------------+-----------+--------------------+--------------------+--------------------+------------------+-----------------+-------------------+-----------+

耗时对比如下
14_32_58__10_31_2018

其中使用Spark SQL与在Spark SQL上使用OSS Select耗时分别是2.5分钟和38秒。

参考文章

https://yq.aliyun.com/articles/593910
https://yq.aliyun.com/articles/659735
https://yq.aliyun.com/articles/658473
https://mapr.com/blog/spark-data-source-api-extending-our-spark-sql-query-engine/

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
8月前
|
敏捷开发 测试技术 持续交付
云效产品使用常见问题之账号授权就能对当前主账号下所有 OSS 进行读写权限如何解决
云效作为一款全面覆盖研发全生命周期管理的云端效能平台,致力于帮助企业实现高效协同、敏捷研发和持续交付。本合集收集整理了用户在使用云效过程中遇到的常见问题,问题涉及项目创建与管理、需求规划与迭代、代码托管与版本控制、自动化测试、持续集成与发布等方面。
|
2月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
5月前
|
SQL 存储 分布式计算
|
6月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
分布式计算 数据处理 流计算
实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
SQL 分布式计算 监控
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
本文演示了使用 EMR Serverless Spark 产品搭建一个日志分析应用的全流程,包括数据开发和生产调度以及交互式查询等场景。
56618 7
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
|
7月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
7月前
|
SQL 分布式计算 大数据
MaxCompute产品使用问题之如果oss文件过大,如何在不调整oss源文件大小的情况下优化查询sql
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
SQL 监控 大数据
OSS Select使用场景及技巧
背景介绍 OSS Select是OSS近期推出的一项新功能,它可以对OSS上的类CSV文件(其他类型文件比如Json也会很快推出)在服务器端运行SQL查询,仅将查询结果返回给客户端。举例来说,假如你有一个10GB的csv日志文件,有一列数据是错误码,想把其中所有错误码等于500的日志找出来,用OSS Select仅仅返回错误码是500的日志,在从而降低用户的数据传输成本以及处理数据的成本,相比在客户端下载整个文件再处理在性能上也可以提高最多到6倍以上。
16323 0
|
6月前
|
机器学习/深度学习 人工智能 专有云
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。

相关产品

  • 对象存储
  • 下一篇
    开通oss服务