我有使用逗号分隔符指定的文件集合,如:
hdfs://user/cloudera/date=2018-01-15,hdfs://user/cloudera/date=2018-01-16,hdfs://user/cloudera/date=2018-01-17,hdfs://user/cloudera/date=2018-01-18,hdfs://user/cloudera/date=2018-01-19,hdfs://user/cloudera/date=2018-01-20,hdfs://user/cloudera/date=2018-01-21,hdfs://user/cloudera/date=2018-01-22
我正在使用Apache Spark加载文件,所有这些都与:
val input = sc.textFile(files)
此外,我还有与每个文件相关的其他信息 - 唯一ID,例如:
hdfs://user/cloudera/date=2018-01-15 | 12345
hdfs://user/cloudera/date=2018-01-16 | 09245
hdfs://user/cloudera/date=2018-01-17 | 345hqw4
and so on
作为输出,我需要接收带有行的DataFrame,其中每行将包含相同的ID,作为从中读取该行的文件的ID。
是否有可能以某种方式将此信息传递给Spark,以便能够与线路相关联?
核心sql方法UDF(join如果你将File - > ID映射表示为Dataframe,你可以实现同样的目的):
import org.apache.spark.sql.functions
val inputDf = sparkSession.read.text(".../src/test/resources/test")
.withColumn("fileName", functions.input_file_name())
def withId(mapping: Map[String, String]) = functions.udf(
(file: String) => mapping.get(file)
)
val mapping = Map(
"file:///.../src/test/resources/test/test1.txt" -> "id1",
"file:///.../src/test/resources/test/test2.txt" -> "id2"
)
val resutlDf = inputDf.withColumn("id", withId(mapping)(inputDf("fileName")))
resutlDf.show(false)
结果:
value | fileName | id |
---|---|---|
row1 | file:///.../src/test/resources/test/test1.txt | id1 |
row11 | file:///.../src/test/resources/test/test1.txt | id1 |
row2 | file:///.../src/test/resources/test/test2.txt | id2 |
row22 | file:///.../src/test/resources/test/test2.txt | id2 |
text1.txt:
row1
row11
text2.txt:
row2
row22
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。