开发者社区> 问答> 正文

如何使用Apache Flink读取HDFS中的parquet文件?

我只找到TextInputFormat和CsvInputFormat。那么如何使用Apache Flink在HDFS中读取parquet文件?

展开
收起
flink小助手 2018-11-28 16:35:08 10725 0
2 条回答
写回答
取消 提交回答
  • 手动指定schema 可以读 见 项目 https://gitee.com/jsqf/flinklearn.git

    2019-10-14 22:47:14
    赞同 1 展开评论 打赏
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "我已经找到了一种通过Apache Flink在HDFS中读取parquet文件的方法。

    应该在pom.xml中添加以下依赖项


    org.apache.flink
    flink-hadoop-compatibility_2.11
    1.6.1


    org.apache.flink
    flink-avro
    1.6.1


    org.apache.parquet
    parquet-avro
    1.10.0


    org.apache.hadoop
    hadoop-mapreduce-client-core
    3.1.1


    org.apache.hadoop
    hadoop-hdfs
    3.1.1


    org.apache.hadoop
    hadoop-core
    1.2.1

    创建avsc文件以定义架构。经验:

    {“namespace”:“com.flinklearn.models”,“type”:“record”,“name”:“AvroTamAlert”,“fields”:[{“name”:“raw_data”,“type”:[“string” “,“空值”]} ] }

    运行“java -jar D: avro-tools-1.8.2.jar compile schema alert.avsc。” 生成Java类并将AvroTamAlert.java复制到项目中。

    使用AvroParquetInputFormat读取hdfs中的parquet文件:

    class Main {def startApp():Unit = {val env = ExecutionEnvironment.getExecutionEnvironment

    val job = Job.getInstance()

    val dIf = new HadoopInputFormatVoid, AvroTamAlert, classOf[Void], classOf[AvroTamAlert], job)
    FileInputFormat.addInputPath(job, new Path(""/user/hive/warehouse/testpath""))

    val dataset = env.createInput(dIf)

    println(dataset.count())

    env.execute(""start hdfs parquet test"")
    object main {def main(args:Array [String]):Unit = {new Main()。startApp()}}"

    2019-07-17 23:16:53
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink技术进阶 立即下载
Apache Spark: Cloud and On-Prem 立即下载
海量数据分布式存储——Apache HDFS之最新进展 立即下载

相关镜像