开发者社区> 问答> 正文

如何将Spark Dataframe列的每个值作为字符串传递给python UDF?

我正在尝试GPG加密spark数据帧列 FName

df = spark.createDataFrame([('Andy', 'NY'), ('Bob', 'PA'), ('Cindy', 'DC')], ("FName", "City"))
我创建了一个udf,它将字符串值作为输入,并将加密字符串作为输出。

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
encrypt_str = udf(lambda string_value: gpg.encrypt(string_value, 'myrecepeintemailid', passphrase='mypassphrase'))
我正在申请我的udf如下:

df = df.withColumn('Encrypted_FName', encrypt_str(col('FName')))
但是,我认为整个列都已通过,并且它没有正确加密值。

如何遍历数据帧的每个值并将其传递string_value给udf?

展开
收起
社区小助手 2018-12-12 14:09:31 3476 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    您可以创建一个新的数据帧。

    对于必须进行哈希处理的列,我有类似的问题。python函数定义如下:

    def make_hash(txt):

    import hashlib
    m = hashlib.sha256()
    m.update(txt.encode())
    print ("hashed ", m)
    return m.hexdigest()  

    定义了一个udf:

    from pyspark.sql.functions import udf
    u_make_hash = udf(make_hash)
    并创建了一个新的DataFrame,除了哈希列之外的所有列:

    streamingOutputDF = streamingInputDF.select(u_make_hash(streamingInputDF['connectionDeviceId']).alias("Id"), streamingInputDF['*']) \

                                    .drop("connectionDeviceId")   

    我没有检查你的udf,假设没有问题,以下声明应该这样做:

    dfnew = df.select((encrypt_str['FName']).alias("Encrypted_FName"))

    2019-07-17 23:20:10
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载