开发者社区> 问答> 正文

Spark Scala:如何同时过滤RDD和更新计数器

我的初始RDD是记录类型,记录的布局是:

a_key, b_key,c_key,f_name,l_name,address
现在我必须:

删除具有a_key或b_key或c_key为空/空的记录
我必须同时更新无效记录的计数器。
我这样试过:

sc.register( recordStatsAccumulator, "Stat accumulator for " + filename )

val nullFilteredRecords = records.map{ record =>

if( record.A_KEY.isEmpty ||

record.B_KEY.isEmpty ||
record.C_KEY.isEmpty )

{

recordStatsAccumulator.add( ValidationLoggingUtil.INVALID )

}

record

}
.filter( record =>

!record.A_KEY.isEmpty &&
  !record.B_KEY.isEmpty &&
  !record.C_KEY.isEmpty

)
但是,这段代码效率不高,因为整个RDD都是两次。首先,更新无效记录的计数器,然后再次删除无效记录。

有没有更好/更有效的方法来做到这一点?

展开
收起
社区小助手 2018-12-19 16:46:33 3426 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    我想你可以一步完成这两项操作。像这样:

    val nullFilteredRecords = records.filter { record =>
    if( record.A_KEY.isEmpty ||

    record.B_KEY.isEmpty ||
    record.C_KEY.isEmpty ) {
    recordStatsAccumulator.add( ValidationLoggingUtil.INVALID )

    }
    !record.A_KEY.isEmpty && !record.B_KEY.isEmpty && !record.C_KEY.isEmpty
    }

    2019-07-17 23:23:02
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载