开发者社区> 问答> 正文

Spark Scala - 检查嵌套案例类的字段

我有三个案例类如下:

case class Result(
result: Seq[Signal],
hop: Int)

case class Signal(
rtt: Double,
from: String)

case class Traceroute(
dst_name: String,
from: String,
prb_id: BigInt,
msm_id: BigInt,
timestamp: BigInt,
result: Seq[Result])
甲跟踪路由具有字段result,其是序列 结果。每个结果都是一系列信号。

我试着检查一个字段Result是否不是负数。我的json记录如下:

{"prb_id": 4247, "result": [{"result": [{"rtt": 1.955, "ttl": 255, "from": "89.105.200.57", "size": 28}, {"rtt": 1.7, "ttl": 255, "from": "10.10.0.5", "size": 28}, {"rtt": 1.709, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 1}]}
为清楚起见,我省略了json记录中的一些属性。该结果属性为跟踪路由情况下类的结果字段。

我使用过滤器来检查信号中的rtt是否通过使用过滤器而被注释为负,但我没有预期的结果。

val checkrtts = checkError.filter(x => x.result.foreach(p => p.result.foreach(f => checkSignal(f))))
checkSignal功能如下:

def checkSignal(signal: Signal): Signal = {
if (signal.rtt > 0) {

return signal

} else {

return null

}

}
给出Traceroute的两个实例的示例:

{"timestamp": 1514768409, "result": [{"result": [{"rtt": 1.955, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 1}]}
{"timestamp": 1514768402, "result": [{"result": [{"rtt": -2.5, "ttl": 255, "from": "89.105.200.57", "size": 28},{"rtt": 19.955, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 2}]}
对于第一个Traceroute,不应用任何更改。对于第二个Traceroute,result.result字段有两个元素(类型为Signal),第一个Signal有负rtt,所以我应该从result.result中删除这个Signal。但是第二个信号不应该被删除。

因此,输出应如下:

{"timestamp": 1514768409, "result": [{"result": [{"rtt": 1.955, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 1}]}
{"timestamp": 1514768402, "result": [{"result": [{"rtt": 19.955, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 2}]}

展开
收起
社区小助手 2018-12-12 14:30:00 2628 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    看起来你对过滤器功能应该做什么有一点误解。它Traceroute从返回的数据集中过滤整个对象false。你需要做的是编写一个map函数,它将原始Traceroute对象转换为所需的对象。下面是如何执行此操作的示例Dataset[Traceroute]

    首先,您需要稍微修改您的案例类,如下所示。

    case class Result(var result: Seq[Signal],

                   hop:    Int)
    

    case class Signal(rtt: Double,

                   from: String)
    

    case class Traceroute( dst_name: String,

                       from:      String,
                       prb_id:    BigInt,
                       msm_id:    BigInt,
                       timestamp: BigInt,
                       result:    Seq[Result])

    正如你可以看到我已经添加var到result的领域Result类。这将有助于我们result稍后在自定义函数中修改字段,我们将传递给地图操作

    然后定义以下两个函数,如下所示:

    def checkSignal(signal: Signal): Boolean = {

    if (signal.rtt > 0) {
      return true
    } else {
      return false
    }
    

    }

    def removeNegative(traceroute: Traceroute): Traceroute = {

    val outerList = traceroute.result
    for( temp <- outerList){
    
      val innerList = temp.result
      //here we are filtering the list to only contain nonnegative elements
      val newinnerList = innerList.filter(checkSignal(_))
      //here we are reassigning the newlist to result
      temp.result = newinnerList
    
    }
    
    traceroute

    }
    现在,我们将从已转换的数据集中映射原始数据集,我们正确地接收过滤列表。

    val dataPath = "hdfs/path/to/traceroute.json"
    val tracerouteSchema = ScalaReflection.schemaFor[Traceroute].dataType.asInstanceOf[StructType]
    val dataset = spark.read.schema(tracerouteSchema).json(dataPath).as[Traceroute]

    println("Showing 10 rows of original Dataset")
    dataset.show(10, truncate = false)

    val maprtts = dataset.map(x => removeNegative(x))

    println("Showing 10 rows of transformed dataset")
    maprtts.show(10, truncate = false)
    以下是输出:

    Showing 10 rows of original dataset
    dst_name from prb_id msm_id timestamp result
    null null null null 1514768409 [[[[1.955, 89.105.200.57]], 1]]
    null null null null 1514768402 [[[[-2.5, 89.105.200.57], [19.955, 89.105.200.57]], 2]]
    Showing 10 rows of transformed dataset
    dst_name from prb_id msm_id timestamp result
    null null null null 1514768409 [[[[1.955, 89.105.200.57]], 1]]
    null null null null 1514768402 [[[[19.955, 89.105.200.57]], 2]]
    2019-07-17 23:20:12
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载