我正在从S3读取大量的CSV(一切都在一个键前缀下)并创建一个强类型的Dataset。
val events: DataFrame = cdcFs.getStream()
events
.withColumn("event", lit("I"))
.withColumn("source", lit(sourceName))
.as[TradeRecord]
其中TradeRecord是一个案例类,通常可以通过SparkSession implicits反序列化。但是,对于某个批处理,记录无法反序列化。这是错误(省略了堆栈跟踪)
Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:
deal作为一个字段TradeRecord永远不应该在源数据(S3对象)中为空,所以它不是一个Option。
不幸的是,错误消息并没有给我任何关于CSV数据的样子,甚至是它来自哪个CSV文件的线索。该批处理包含数百个文件,因此我需要一种方法将其缩小到最多几个文件以调查该问题。
这是我提出的解决方案(我正在使用Spark Structured Streaming):
val stream = spark.readStream
.format("csv")
.schema(schema) // a StructType defined elsewhere
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "corruptRecord")
.load(path)
// If debugging, check for any corrupted CSVs
if (log.isDebugEnabled) { // org.apache.spark.internal.Logging trait
import spark.implicits._
stream
.filter($"corruptRecord".isNotNull)
.withColumn("input_file", input_file_name)
.select($"input_file", $"corruptRecord")
.writeStream
.format("console")
.option("truncate", false)
.start()
}
val events = stream
.withColumn("event", lit("I"))
.withColumn("source", lit(sourceName))
.as[TradeRecord]
基本上,如果将Spark日志级别设置为Debug或更低,则会检查DataFrame是否存在损坏的记录,并将所有此类记录与其文件名一起打印出来。最终程序尝试将此DataFrame强制转换为强类型Dataset[TradeRecord]并失败。
根据user10465355的建议,您可以加载数据:
val events: DataFrame = ???
过滤
val mismatched = events.where($"deal".isNull)
添加文件名
import org.apache.spark.sql.functions.input_file_name
val tagged = mismatched.withColumn("_file_name", input_file_name)
(可选)添加块和块以及偏移量:
import org.apache.spark.sql.functions.{spark_partition_id, monotonically_increasing_id, shiftLeft, shiftRight
df
.withColumn("chunk", spark_partition_id())
.withColumn(
"offset",
monotonically_increasing_id - shiftLeft(shiftRight(monotonically_increasing_id, 33), 33))
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。