开发者社区> 问答> 正文

Spark在创建数据集时无法反序列化记录

我正在从S3读取大量的CSV(一切都在一个键前缀下)并创建一个强类型的Dataset。

val events: DataFrame = cdcFs.getStream()
events
.withColumn("event", lit("I"))
.withColumn("source", lit(sourceName))
.as[TradeRecord]
其中TradeRecord是一个案例类,通常可以通过SparkSession implicits反序列化。但是,对于某个批处理,记录无法反序列化。这是错误(省略了堆栈跟踪)

Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:

  • field (class: "scala.Long", name: "deal")
  • root class: "com.company.trades.TradeRecord"
    If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

deal作为一个字段TradeRecord永远不应该在源数据(S3对象)中为空,所以它不是一个Option。

不幸的是,错误消息并没有给我任何关于CSV数据的样子,甚至是它来自哪个CSV文件的线索。该批处理包含数百个文件,因此我需要一种方法将其缩小到最多几个文件以调查该问题。

展开
收起
社区小助手 2018-12-19 17:21:02 3915 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    这是我提出的解决方案(我正在使用Spark Structured Streaming):

    val stream = spark.readStream
    .format("csv")
    .schema(schema) // a StructType defined elsewhere
    .option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "corruptRecord")
    .load(path)

    // If debugging, check for any corrupted CSVs
    if (log.isDebugEnabled) { // org.apache.spark.internal.Logging trait
    import spark.implicits._
    stream

    .filter($"corruptRecord".isNotNull)
    .withColumn("input_file", input_file_name)
    .select($"input_file", $"corruptRecord")
    .writeStream
    .format("console")
    .option("truncate", false)
    .start()

    }

    val events = stream
    .withColumn("event", lit("I"))
    .withColumn("source", lit(sourceName))
    .as[TradeRecord]
    基本上,如果将Spark日志级别设置为Debug或更低,则会检查DataFrame是否存在损坏的记录,并将所有此类记录与其文件名一起打印出来。最终程序尝试将此DataFrame强制转换为强类型Dataset[TradeRecord]并失败。


    根据user10465355的建议,您可以加载数据:

    val events: DataFrame = ???
    过滤

    val mismatched = events.where($"deal".isNull)
    添加文件名

    import org.apache.spark.sql.functions.input_file_name

    val tagged = mismatched.withColumn("_file_name", input_file_name)
    (可选)添加块和块以及偏移量:

    import org.apache.spark.sql.functions.{spark_partition_id, monotonically_increasing_id, shiftLeft, shiftRight

    df
    .withColumn("chunk", spark_partition_id())
    .withColumn(

    "offset",
    monotonically_increasing_id - shiftLeft(shiftRight(monotonically_increasing_id, 33), 33))
    
    2019-07-17 23:23:06
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载