开发者社区> 问答> 正文

PySpark:如何从spark数据框创建嵌套的JSON?

我试图从我的spark数据帧创建一个嵌套的json,它具有以下结构的数据。下面的代码创建了一个带键和值的简单json。

df.coalesce(1).write.format('json').save(data_output_file+"createjson.json", overwrite=True)
Update1:​​根据@MaxU的回答,我将spark数据帧转换为pandas并使用group by。它将最后两个字段放在嵌套数组中。我怎么能首先把类别和计数放在嵌套数组中,然后在那个数组里面我想要放置子类别和计数。

示例文本数据:

Vendor_Name,count,Categories,Category_Count,Subcategory,Subcategory_Count
Vendor1,10,Category 1,4,Sub Category 1,1
Vendor1,10,Category 1,4,Sub Category 2,2
Vendor1,10,Category 1,4,Sub Category 3,3
Vendor1,10,Category 1,4,Sub Category 4,4

j = (data_pd.groupby(['vendor_name','vendor_Cnt','Category','Category_cnt'], as_index=False)

         .apply(lambda x: x[['Subcategory','subcategory_cnt']].to_dict('r'))
         .reset_index()
         .rename(columns={0:'subcategories'})
         .to_json(orient='records'))

[{

    "vendor_name": "Vendor 1",
    "count": 10,
    "categories": [{
        "name": "Category 1",
        "count": 4,
        "subCategories": [{
                "name": "Sub Category 1",
                "count": 1
            },
            {
                "name": "Sub Category 2",
                "count": 1
            },
            {
                "name": "Sub Category 3",
                "count": 1
            },
            {
                "name": "Sub Category 4",
                "count": 1
            }
        ]
    }]

展开
收起
社区小助手 2018-12-06 15:17:35 3005 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    在python / pandas中执行此操作的最简单方法是使用groupby我认为使用一系列嵌套生成器:

    def split_df(df):

    for (vendor, count), df_vendor in df.groupby(["Vendor_Name", "count"]):
        yield {
            "vendor_name": vendor,
            "count": count,
            "categories": list(split_category(df_vendor))
        }
    

    def split_category(df_vendor):

    for (category, count), df_category in df_vendor.groupby(
        ["Categories", "Category_Count"]
    ):
        yield {
            "name": category,
            "count": count,
            "subCategories": list(split_subcategory(df_category)),
        }
    

    def split_subcategory(df_category):

    for row in df.itertuples():
        yield {"name": row.Subcategory, "count": row.Subcategory_Count}
    

    list(split_df(df))
    [

    {
        "vendor_name": "Vendor1",
        "count": 10,
        "categories": [
            {
                "name": "Category 1",
                "count": 4,
                "subCategories": [
                    {"name": "Sub Category 1", "count": 1},
                    {"name": "Sub Category 2", "count": 2},
                    {"name": "Sub Category 3", "count": 3},
                    {"name": "Sub Category 4", "count": 4},
                ],
            }
        ],
    }

    ]
    要将其导出json,您需要一种导出方式np.int64

    2019-07-17 23:18:33
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载