我只找到TextInputFormat和CsvInputFormat。那么如何使用Apache Flink在HDFS中读取parquet文件?
"我已经找到了一种通过Apache Flink在HDFS中读取parquet文件的方法。
应该在pom.xml中添加以下依赖项
org.apache.flink
flink-hadoop-compatibility_2.11
1.6.1
org.apache.flink
flink-avro
1.6.1
org.apache.parquet
parquet-avro
1.10.0
org.apache.hadoop
hadoop-mapreduce-client-core
3.1.1
org.apache.hadoop
hadoop-hdfs
3.1.1
org.apache.hadoop
hadoop-core
1.2.1
创建avsc文件以定义架构。经验:
{“namespace”:“com.flinklearn.models”,“type”:“record”,“name”:“AvroTamAlert”,“fields”:[{“name”:“raw_data”,“type”:[“string” “,“空值”]} ] }
运行“java -jar D: avro-tools-1.8.2.jar compile schema alert.avsc。” 生成Java类并将AvroTamAlert.java复制到项目中。
使用AvroParquetInputFormat读取hdfs中的parquet文件:
class Main {def startApp():Unit = {val env = ExecutionEnvironment.getExecutionEnvironment
val job = Job.getInstance()
val dIf = new HadoopInputFormatVoid, AvroTamAlert, classOf[Void], classOf[AvroTamAlert], job)
FileInputFormat.addInputPath(job, new Path(""/user/hive/warehouse/testpath""))
val dataset = env.createInput(dIf)
println(dataset.count())
env.execute(""start hdfs parquet test"")
object main {def main(args:Array [String]):Unit = {new Main()。startApp()}}"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。