重读avro文件 对文件进行简单的mr计算

简介:

public class ReadAvroInput {

public static class ReadAvroInputMap extends Mapper<AvroKey<UserActionLog>, NullWritable, Text, IntWritable> {

    private Text oKey = new Text();
    private final IntWritable ONE = new IntWritable(1);
    private UserActionLog keyData;

    @Override
    protected void map(AvroKey<UserActionLog> key, NullWritable value,
            Mapper<AvroKey<UserActionLog>, NullWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        keyData = key.datum();
        oKey.set(keyData.getProvience().toString());
        context.write(oKey, ONE);
    }
}

public static class ReadAvroInputReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private int sum;
    private IntWritable oValue = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        sum=0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        oValue.set(sum);
        context.write(key, oValue);

    }
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration configuration =new Configuration();
    Job job =Job.getInstance(configuration);
    job.setJarByClass(ReadAvroInput.class);
    job.setJobName("重读avro文件进行mr计算");
    
    job.setMapperClass(ReadAvroInputMap.class);
    job.setCombinerClass(ReadAvroInputReducer.class);
    job.setReducerClass(ReadAvroInputReducer.class);
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    job.setInputFormatClass(AvroKeyInputFormat.class);
    AvroJob.setInputKeySchema(job, UserActionLog.getClassSchema());
    
    FileInputFormat.addInputPath(job, new Path("/ReducerJoin/part-r-00000.avro"));
    Path outputPath =new Path("/ReadAvroInput");
    outputPath.getFileSystem(configuration).delete(outputPath, true);
    FileOutputFormat.setOutputPath(job, outputPath);
    System.exit(job.waitForCompletion(true)?0:1);
}

}

UserActionLog是通过mvn 指令通过schema框架生成的

相关文章
|
消息中间件 Java Kafka
kafka 客户端使用Avro序列化
kafka 客户端使用Avro序列化
213 0
|
分布式计算 Java Hadoop
JAVA—其他—Avro序列化
Avro是hadoop的一个用于序列化的组件 理解特点: 1. 高效 2. 序列化后体积小 3. 动态 动态指的是数据的结构一旦定义,可以在多处语言生成实体类
296 0
|
存储 分布式计算 Java
深入对比Java与Hadoop大数据序列化机制Avro
Java有自己提供的序列化机制,而我们的Hadoop也提供了自己的序列化机制,二者究竟有什么差异呢?为什么Hadoop要重新设计自己的序列化体系?序列化大数据对象的过程,Writable接口底层源码实现。
2151 0
|
Java Maven 网络协议
Avro序列化和RPC实现
序列化和反序列化 Maven:Pom.xml <dependencies> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.
4280 0
|
SQL 存储 分布式计算
Avro数据序列化
序列化:把结构化的对象转换成字节流,使得能够在系统中或网络中通信 需要把数据存储到hadoop的hbase 常用序列化系统 thrift   (hive,hbase) Protocol Buffer (google) avro 本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077
1454 0
|
存储 Java Apache
rpc框架之 avro 学习 2 - 高效的序列化
同一类框架,后出现的总会吸收之前框架的优点,然后加以改进,avro在序列化方面相对thrift就是一个很好的例子。借用Apache Avro 与 Thrift 比较 一文中的几张图来说明一下,avro在序列化方面的改进: 1、无需强制生成目标语言代码 avro提供了二种使用方式,一种称之为Sepcific方式,这跟thrift基本一致,都是写定义IDL文件,然后用编译器(或插件)生成目标class,另一种方式是Generic,这种方式下,不用生成目标代码,而是采用动态加载定义文件的方式,将 FieldName - FieldValue,以Map的方式存储。
1260 0
|
7月前
|
SQL 消息中间件 数据处理
DataX读取Hive Orc格式表丢失数据处理记录
DataX读取Hive Orc格式表丢失数据处理记录
275 0