我正在读取JSON,并且有一个dictionary(dictn),其键告诉我应从JSON df中选择所有列。
我正在尝试创建一个新的df,然后追加那些来自dictn的键存在于JSON中的列,但出现以下错误:由于我真的是新手,因此非常感谢对此提供的任何帮助。
'运算符中缺少解析的属性ip#238!Project [ip#238 AS ip#267]。;; \ n!Project [ip#238 AS ip#267] \ n +-LogicalRDD false \
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType
import json
from pyspark.sql.functions import explode
jsn={"body":[{"ip":"177.284.10.91","sg_message_id":"YcbG1IBnQ1-626TaUVg2bQ.filter1049p1las1-18982-5C868E5A-20.0","hostname":"d2214390ce89","useragent":"Mozilla/5.0 (Linux; Android 7.1.2; E6810) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.105 Mobile Safari/537.36","method_name":"mass_gifting","email":"test@aol.com","timestamp":1554076768,"url":"https://engagement.test.com/b/genghisgrill","object_id":42813,"category":["42813-3-11-19-bottomless-is-back","713","mass_gifting"],"sg_event_id":"Krfn-yDfTG-CQ-o8zhTb0w","event":"click","klass":"3-11-19-bottomless-is-back","url_offset":{"index":3,"type":"html"},"rails_env":"production","user_id":78003906,"business_id":713}],"account":"testaccount"}
dictn={'ip':'string',
'sg_message_id':'string',
'hostname':'string',
'method_name':'string',
'email':'string',
'timestamp':'bigint',
'smtp-id':'string',
'object_id':'bigint',
'response':'string',
'sg_event_id':'string',
'tls':'string',
'event':'string',
'klass':'string',
'user_id':'string',
'rails_env':'string',
'business_id':'bigint'}
schema = StructType([])
new_df = sqlContext.createDataFrame(sc.emptyRDD(), schema)
a=[json.dumps(jsn)]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)
x=df.select("body")
df1=df.withColumn("foo",explode("body")).select("foo.*")
for k1,v1 in dictn.items():
if k1 in df1.columns:
new_df=new_df.withColumn(k1,df1[k1])
else:
new_df=new_df.withColumn(k1,lit(10))
new_df.show()
之所以会出现该错误,是因为您试图通过引用另一个DataFrame中的列来添加新列,而Spark实际上并不支持该列。已经在这里提出并回答了这个问题:在另一个DataFrame中添加一列
但要实现你想要的东西在这里你只需要使用select从df1它给你与列的列表,你从字典获得新的数据帧。
这应该适合您:
select_expr = [col(c).alias(c) if c in df1.columns else lit(10).alias(c) for c, _ in dictn.items()]
new_df = df1.select(select_expr)
new_df.show()
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。