通过avro合并大文件 并计算文件词频
public class AvroFilemr {
// 读取avro文件 每读取一条记录 其实是一个小文件,对其进行wordcount解析
// 并以单词,1 的形式发送到reducer
public static class AvroFilemrMap extends Mapper<AvroKey<SmallFile>, NullWritable, Text, IntWritable> {
private Text outKey = new Text();
private final IntWritable ONE = new IntWritable(1);
private String[] infos;
private ByteBuffer content;
@Override
protected void map(AvroKey<SmallFile> key, NullWritable value,
Mapper<AvroKey<SmallFile>, NullWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
content = key.datum().getContent();
infos = new String(content.array()).split("\\s");
for (String string : infos) {
outKey.set(string);
context.write(outKey, ONE);
}
}
}
// 把wordcount的计算结果 以word_count.avsc的模式输出成avro文件
public static class AvroFilemrReduce extends Reducer<Text, IntWritable, AvroKey<GenericRecord>, NullWritable> {
private int sum;
private Schema writeSchema;
private GenericRecord record;
private AvroKey<GenericRecord> outKey = new AvroKey<GenericRecord>();
private NullWritable outValue = NullWritable.get();
@Override
protected void setup(Reducer<Text, IntWritable, AvroKey<GenericRecord>, NullWritable>.Context context)
throws IOException, InterruptedException {
Parser parser = new Parser();
writeSchema = parser.parse(new File("src/main/avro/wordCount.avsc"));
record = new GenericData.Record(writeSchema);
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, AvroKey<GenericRecord>, NullWritable>.Context context)
throws IOException, InterruptedException {
sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
record.put("word", key.toString());
record.put("count", sum);
outKey.datum(record);
context.write(outKey, outValue);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration =new Configuration();
Job job=Job.getInstance(configuration);
job.setJarByClass(AvroFilemr.class);
job.setJobName("读avro文件计算并把结果写入到新的avro文件");
job.setMapperClass(AvroFilemrMap.class);
job.setReducerClass(AvroFilemrReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWritable.class);
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
AvroJob.setInputKeySchema(job, SmallFile.getClassSchema());
Parser parser =new Parser();
AvroJob.setOutputKeySchema(job, parser.parse(new File("src/main/avro/wordCount.avsc")));
FileInputFormat.addInputPath(job, new Path("/AvroMErgeSmallFile"));
Path outputPath =new Path("/AvroFilemr");
outputPath.getFileSystem(configuration).delete(outputPath, true);
FileOutputFormat.setOutputPath(job, outputPath);
System.exit(job.waitForCompletion(true)?0:1);
}