我们使用Azure-sqldb-spark库,而不是 Spark 的默认内置导出功能。此库为您提供了一个bulkCopyToSqlDB方法,它是一个真正的批处理插入,并且速度要快得多。与内置功能相比,使用起来不太实用,但根据我的经验,它仍然是值得的。 我们或多或少地使用它像这样:
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
import com.microsoft.azure.sqldb.spark.query._
val options = Map(
"url" -> "***",
"databaseName" -> "***",
"user" -> "***",
"password" -> "***",
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver"
)
// first make sure the table exists, with the correct column types
// and is properly cleaned up if necessary
val query = dropAndCreateQuery(df, "myTable")
val createConfig = Config(options ++ Map("QueryCustom" -> query))
spark.sqlContext.sqlDBQuery(createConfig)
val bulkConfig = Config(options ++ Map(
"dbTable" -> "myTable",
"bulkCopyBatchSize" -> "20000",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
df.bulkCopyToSqlDB(bulkConfig)
如您所见,我们自行生成查询。您可以让库创建表,但它只是执行TABLEdataFrame.limit(0).write.sqlDB(config),可能需要缓存 DataFrame, 并且它不允许您选择SaveMode 可能还有可能很有趣:当将此库添加到 sbt 生成时,我们必须使用ExclusionRule ,否则assembly任务将失败。
libraryDependencies += "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2" excludeAll(
ExclusionRule(organization = "org.apache.spark")
)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。