使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱

使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱

在大数据处理和分析的场景中,Hadoop MapReduce是一种常见且高效的工具。本文将展示如何使用Hadoop MapReduce来分析邮件日志,提取邮件的发送状态(成功、失败或退回)和目标邮箱。

项目结构

我们将创建一个Java项目,该项目包含三个主要部分:

**Mapper类:**解析邮件日志,提取ID、状态和目标邮箱。

**Reducer类:**汇总Mapper输出的数据,生成最终结果。

*Driver类:**配置和运行MapReduce作业。

数据格式

我们将处理的邮件日志示例如下:


在这些日志中,我们需要提取邮件的ID、发送状态(成功、失败或退回)和目标邮箱。

代码实现

以下是完整的Java代码,包含Mapper、Reducer和Driver类:

package org.example.mapReduce;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MailLogAnalysis {

    public static class MailLogMapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();

            if (line.contains("starting delivery")) {
                String[] parts = line.split(" ");
                String id = parts[3].replace(":", "");
                String targetEmail = parts[8];
                context.write(new Text(id), new Text("email," + targetEmail));
            }

            if (line.contains("success") || line.contains("failure") || line.contains("bounce")) {
                String status = "success";
                if (line.contains("failure")) {
                    status = "failure";
                }
                if (line.contains("bounce")) {
                    status = "bounce";
                }
                String[] parts = line.split(" ");
                String id = parts[2].replace(":", "");
                context.write(new Text(id), new Text("status," + status));
            }
        }
    }

    public static class MailLogReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String email = "";
            String status = "failure";
            for (Text val : values) {
                String[] parts = val.toString().split(",", 2);
                if (parts[0].equals("email")) {
                    email = parts[1];
                } else if (parts[0].equals("status")) {
                    status = parts[1];
                }
            }
            context.write(key, new Text(status + "," + email));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Mail Log Analysis");
        job.setJarByClass(MailLogAnalysis.class);
        job.setMapperClass(MailLogMapper.class);
        job.setReducerClass(MailLogReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

使用Hadoop MapReduce分析邮件日志

在大数据处理和分析的场景中,Hadoop MapReduce是一种常见且高效的工具。本文将展示如何使用Hadoop MapReduce来分析邮件日志,提取邮件的发送状态(成功、失败或退回)和目标邮箱。我们将通过一个具体的例子来实现这一目标。


项目结构

我们将创建一个Java项目,该项目包含三个主要部分:


Mapper类:解析邮件日志,提取ID、状态和目标邮箱。

Reducer类:汇总Mapper输出的数据,生成最终结果。

Driver类:配置和运行MapReduce作业。

数据格式

我们将处理的邮件日志示例如下:


less
复制代码
@400000004faa61e21e8e3e24 starting delivery 1820: msg 850901 to remote sunkang@189.cn
@400000004faa61e536864a44 delivery 1820: success: 121.14.53.136_accepted_message./Remote_host_said:_250_Ok:_queued_as_43A2222C006/
@400000004faa61e70a73c60c delivery 1823: deferral: 210.32.157.174_failed_after_I_sent_the_message./Remote_host_said:_450_Requested_action_not_taken:_AQAAf5CrT+qlYqpPamRUAA–.7571S2,_please_try_again/
@400000004faa61e70a73c60c bounce 1824: 550 Mailbox not found

在这些日志中,我们需要提取邮件的ID、发送状态(成功、失败或退回)和目标邮箱。


代码实现

以下是完整的Java代码,包含Mapper、Reducer和Driver类:


java
复制代码
package org.example.mapReduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MailLogAnalysis {
public static class MailLogMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();

        if (line.contains("starting delivery")) {
            String[] parts = line.split(" ");
            String id = parts[3].replace(":", "");
            String targetEmail = parts[8];
            context.write(new Text(id), new Text("email," + targetEmail));
        }

        if (line.contains("success") || line.contains("failure") || line.contains("bounce")) {
            String status = "success";
            if (line.contains("failure")) {
                status = "failure";
            }
            if (line.contains("bounce")) {
                status = "bounce";
            }
            String[] parts = line.split(" ");
            String id = parts[2].replace(":", "");
            context.write(new Text(id), new Text("status," + status));
        }
    }
}

public static class MailLogReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String email = "";
        String status = "failure";
        for (Text val : values) {
            String[] parts = val.toString().split(",", 2);
            if (parts[0].equals("email")) {
                email = parts[1];
            } else if (parts[0].equals("status")) {
                status = parts[1];
            }
        }
        context.write(key, new Text(status + "," + email));
    }
}

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Mail Log Analysis");
    job.setJarByClass(MailLogAnalysis.class);
    job.setMapperClass(MailLogMapper.class);
    job.setReducerClass(MailLogReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

代码解释

Mapper类

MailLogMapper类从日志中提取邮件的ID、目标邮箱和发送状态,并将这些信息作为键值对输出:


如果行包含"starting delivery",则提取邮件的ID和目标邮箱,并输出键值对<ID, email, 目标邮箱>。

如果行包含"success"、“failure"或"bounce”,则提取邮件的ID和发送状态,并输出键值对<ID, status, 发送状态>。

Reducer类

MailLogReducer类汇总Mapper输出的数据,生成最终的结果:


对于每个邮件ID,汇总对应的目标邮箱和发送状态。

输出包含ID、发送状态和目标邮箱的最终结果。

Driver类

MailLogAnalysis类配置和运行MapReduce作业:


设置作业名称、Mapper类和Reducer类。

设置输入路径和输出路径。

提交作业并等待完成。

MapReduce运行结果

总结

通过本文的示例,我们展示了如何使用Hadoop MapReduce来分析邮件日志,提取邮件的发送状态和目标邮箱。希望本文能为您的大数据处理和分析工作提供一些帮助。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
1天前
|
缓存 监控 算法
分析慢日志文件来优化 PHP 脚本的性能
分析慢日志文件来优化 PHP 脚本的性能
|
18天前
|
存储 分布式计算 Hadoop
Hadoop日志纪录篇
关于Hadoop日志记录的详细解析,涵盖了日志类型、存储位置、如何查看和管理日志,以及日志聚合等。
15 0
Hadoop日志纪录篇
|
27天前
|
存储 SQL 分布式计算
Hadoop生态系统概述:构建大数据处理与分析的基石
【8月更文挑战第25天】Hadoop生态系统为大数据处理和分析提供了强大的基础设施和工具集。通过不断扩展和优化其组件和功能,Hadoop将继续在大数据时代发挥重要作用。
|
28天前
|
存储 分布式计算 大数据
【Flume的大数据之旅】探索Flume如何成为大数据分析的得力助手,从日志收集到实时处理一网打尽!
【8月更文挑战第24天】Apache Flume是一款高效可靠的数据收集系统,专为Hadoop环境设计。它能在数据产生端与分析/存储端间搭建桥梁,适用于日志收集、数据集成、实时处理及数据备份等多种场景。通过监控不同来源的日志文件并将数据标准化后传输至Hadoop等平台,Flume支持了性能监控、数据分析等多种需求。此外,它还能与Apache Storm或Flink等实时处理框架集成,实现数据的即时分析。下面展示了一个简单的Flume配置示例,说明如何将日志数据导入HDFS进行存储。总之,Flume凭借其灵活性和强大的集成能力,在大数据处理流程中占据了重要地位。
33 3
|
29天前
|
应用服务中间件 Linux nginx
在Linux中,如何统计ip访问情况?分析 nginx 访问日志?如何找出访问页面数量在前十位的ip?
在Linux中,如何统计ip访问情况?分析 nginx 访问日志?如何找出访问页面数量在前十位的ip?
|
1月前
|
监控 安全 关系型数据库
在Linux中,什么是系统日志和应用程序日志?如何分析它们?
在Linux中,什么是系统日志和应用程序日志?如何分析它们?
|
23天前
|
存储 消息中间件 监控
Java日志详解:日志级别,优先级、配置文件、常见日志管理系统ELK、日志收集分析
Java日志详解:日志级别,优先级、配置文件、常见日志管理系统、日志收集分析。日志级别从小到大的关系(优先级从低到高): ALL < TRACE < DEBUG < INFO < WARN < ERROR < FATAL < OFF 低级别的会输出高级别的信息,高级别的不会输出低级别的信息
|
24天前
|
缓存 分布式计算 算法
优化Hadoop MapReduce性能的最佳实践
【8月更文第28天】Hadoop MapReduce是一个用于处理大规模数据集的软件框架,适用于分布式计算环境。虽然MapReduce框架本身具有很好的可扩展性和容错性,但在某些情况下,任务执行可能会因为各种原因导致性能瓶颈。本文将探讨如何通过调整配置参数和优化算法逻辑来提高MapReduce任务的效率。
100 0
|
24天前
|
算法 关系型数据库 程序员
第一周算法设计与分析:A : log2(N)
这篇文章介绍了解决算法问题"输入一个数N,输出log2N(向下取整)"的三种编程思路,包括使用对数函数和幂函数的转换方法,以及避免浮点数精度问题的整数逼近方法。
|
1月前
|
存储 数据可视化 Linux
在Linux中,如何使用ELK进行日志管理和分析?
在Linux中,如何使用ELK进行日志管理和分析?