我有三个案例类如下:
case class Result(
result: Seq[Signal],
hop: Int)
case class Signal(
rtt: Double,
from: String)
case class Traceroute(
dst_name: String,
from: String,
prb_id: BigInt,
msm_id: BigInt,
timestamp: BigInt,
result: Seq[Result])
甲跟踪路由具有字段result,其是序列 结果。每个结果都是一系列信号。
我试着检查一个字段Result是否不是负数。我的json记录如下:
{"prb_id": 4247, "result": [{"result": [{"rtt": 1.955, "ttl": 255, "from": "89.105.200.57", "size": 28}, {"rtt": 1.7, "ttl": 255, "from": "10.10.0.5", "size": 28}, {"rtt": 1.709, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 1}]}
为清楚起见,我省略了json记录中的一些属性。该结果属性为跟踪路由情况下类的结果字段。
我使用过滤器来检查信号中的rtt是否通过使用过滤器而被注释为负,但我没有预期的结果。
val checkrtts = checkError.filter(x => x.result.foreach(p => p.result.foreach(f => checkSignal(f))))
checkSignal功能如下:
def checkSignal(signal: Signal): Signal = {
if (signal.rtt > 0) {
return signal
} else {
return null
}
}
给出Traceroute的两个实例的示例:
{"timestamp": 1514768409, "result": [{"result": [{"rtt": 1.955, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 1}]}
{"timestamp": 1514768402, "result": [{"result": [{"rtt": -2.5, "ttl": 255, "from": "89.105.200.57", "size": 28},{"rtt": 19.955, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 2}]}
对于第一个Traceroute,不应用任何更改。对于第二个Traceroute,result.result字段有两个元素(类型为Signal),第一个Signal有负rtt,所以我应该从result.result中删除这个Signal。但是第二个信号不应该被删除。
因此,输出应如下:
{"timestamp": 1514768409, "result": [{"result": [{"rtt": 1.955, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 1}]}
{"timestamp": 1514768402, "result": [{"result": [{"rtt": 19.955, "ttl": 255, "from": "89.105.200.57", "size": 28}], "hop": 2}]}
。
看起来你对过滤器功能应该做什么有一点误解。它Traceroute从返回的数据集中过滤整个对象false。你需要做的是编写一个map函数,它将原始Traceroute对象转换为所需的对象。下面是如何执行此操作的示例Dataset[Traceroute]
首先,您需要稍微修改您的案例类,如下所示。
case class Result(var result: Seq[Signal],
hop: Int)
case class Signal(rtt: Double,
from: String)
case class Traceroute( dst_name: String,
from: String,
prb_id: BigInt,
msm_id: BigInt,
timestamp: BigInt,
result: Seq[Result])
正如你可以看到我已经添加var到result的领域Result类。这将有助于我们result稍后在自定义函数中修改字段,我们将传递给地图操作
然后定义以下两个函数,如下所示:
def checkSignal(signal: Signal): Boolean = {
if (signal.rtt > 0) {
return true
} else {
return false
}
}
def removeNegative(traceroute: Traceroute): Traceroute = {
val outerList = traceroute.result
for( temp <- outerList){
val innerList = temp.result
//here we are filtering the list to only contain nonnegative elements
val newinnerList = innerList.filter(checkSignal(_))
//here we are reassigning the newlist to result
temp.result = newinnerList
}
traceroute
}
现在,我们将从已转换的数据集中映射原始数据集,我们正确地接收过滤列表。
val dataPath = "hdfs/path/to/traceroute.json"
val tracerouteSchema = ScalaReflection.schemaFor[Traceroute].dataType.asInstanceOf[StructType]
val dataset = spark.read.schema(tracerouteSchema).json(dataPath).as[Traceroute]
println("Showing 10 rows of original Dataset")
dataset.show(10, truncate = false)
val maprtts = dataset.map(x => removeNegative(x))
println("Showing 10 rows of transformed dataset")
maprtts.show(10, truncate = false)
以下是输出:
Showing 10 rows of original dataset | |||||
---|---|---|---|---|---|
dst_name | from | prb_id | msm_id | timestamp | result |
null | null | null | null | 1514768409 | [[[[1.955, 89.105.200.57]], 1]] |
null | null | null | null | 1514768402 | [[[[-2.5, 89.105.200.57], [19.955, 89.105.200.57]], 2]] |
Showing 10 rows of transformed dataset | |||||
---|---|---|---|---|---|
dst_name | from | prb_id | msm_id | timestamp | result |
null | null | null | null | 1514768409 | [[[[1.955, 89.105.200.57]], 1]] |
null | null | null | null | 1514768402 | [[[[19.955, 89.105.200.57]], 2]] |
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。