1.前言
DistributedCache是hadoop框架提供的一种机制,可以将job指定的文件,在job执行前,先行分发到task执行的机器上,并有相关机制对cache文件进行管理。
DistributedCache 可将具体应用相关的、大尺寸的、只读的文件有效地分布放置。DistributedCache 是Map/Reduce框架提供的功能,能够缓存应用程序所需的文件 (包括文本,档案文件,jar文件等)。
Map-Redcue框架在作业所有任务执行之前会把必要的文件拷贝到slave节点上。 它运行高效是因为每个作业的文件只拷贝一次并且为那些没有文档的slave节点缓存文档。
DistributedCache 根据缓存文档修改的时间戳进行追踪。 在作业执行期间,当前应用程序或者外部程序不能修改缓存文件。
distributedCache可以分发简单的只读数据或文本文件,也可以分发复杂类型的文件例如归档文件和jar文件。归档文件(zip,tar,tgz和tar.gz文件)在slave节点上会被解档(un-archived)。 这些文件可以设置执行权限。
用户可以通过设置mapred.cache.{files|archives}来分发文件。 如果要分发多个文件,可以使用逗号分隔文件所在路径。
DistributedCache可在map/reduce任务中作为 一种基础软件分发机制使用。它可以被用于分发jar包和本地库(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API能够被用于 缓存文件和jar包,并把它们加入子jvm的classpath。也可以通过设置配置文档里的属性 mapred.job.classpath.{files|archives}达到相同的效果。缓存文件可用于分发和装载本地库。
添加缓存文件:
DistributedCache.addCacheFile(URI,conf)
DistributedCache.addCacheArchive(URI,conf) DistributedCache.setCacheFiles(URIs,conf)
DistributedCache.setCacheArchives(URIs,conf)
其中URI的形式是 hdfs://host:port/absolute-path#link-name
缓存Jar:
DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API能够被用于 缓存文件和jar包,并把它们加入子jvm的classpath。
DistributedCache.createSymlink(Configuration)方法让DistributedCache 在当前工作目录下创建到缓存文件的符号链接。或者通过设置配置文件属性mapred.create.symlink为yes。 分布式缓存会截取URI的片段作为链接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so,则在task当前工作目录会有名为lib.so的链接,它会链接分布式缓存中的lib.so.1。
2.常见应用场景
(1)分发第三方库(jar,so等);
(2)分发算法需要的词典文件;
(3)分发程序运行需要的配置;
(4)分发多表数据join时小表数据简便处理等
3.注意事项
1.DistributedCache只能应用于分布式的情况,包括伪分布式,完全分布式.有些api在这2种情况下有移植性问题.
2.需要分发的文件,必须提前放到hdfs上.默认的路径前缀是hdfs://的,不是file://
3.需要分发的文件,最好在运行期间是只读的.
4.不建议分发较大的文件,比如压缩文件,可能会影响task的启动速度.
5.注意在Driver中创建Job实例时一定要把Configuration类型的参数传递进去,否则在Mapper或Reducer中调用DistributedCache.getLocalCacheFiles(conf);返回值就为null。因为空构造函数的Job采用的Configuration是从hadoop的配置文件中读出来的(使用new Configuration()创建的Configuration就是从hadoop的配置文件中读出来的),请注意在main()函数中有一句:DistributedCache.addCacheFile(dataFile.toUri(), conf);即此时的Configuration中多了一个DistributedCacheFile,所以你需要把这个Configuration传递给Job构造函数,如果传递默认的Configuration,那在Job中当然不知道DistributedCacheFile的存在了。
4.基本流程
1.每个tasktracker启动时,都会产生一个TrackerDistributedCacheManager对象,用来管理该tt机器上所有的task的cache文件.
2.在客户端提交job时,在JobClient内,对即将cache的文件,进行校验
以确定文件是否存在,文件的大小,文件的修改时间,以及文件的权限是否是private or public.
3.当task在tt初始化job时,会由TrackerDistributedCacheManager产生一个TaskDistributedCacheManager对象,来管理本task的cache文件.
4.和本task相关联的TaskDistributedCacheManager,获取并解压相关cache文件到本地相应目录如果本tt机器上已经有了本job的其他task,并已经完成了相应cache文件的获取和解压工作,则游戏出售平台不会重复进行。如果本地已经有了cache文件,则比较修改时间和hdfs上的文件是否一致,如果一致则可以使用.
5.当task结束时,会对该cache进行ref减一操作.
6.TrackerDistributedCacheManager有一个clearup线程,每隔1min会去处理那些无人使用的,目录大小大于local.cache.size或者子目录个数大于mapreduce.tasktracker.cache.local.numberdirectories的cache目录.
5.应用实例
public class MapJoinByCache {
public static class MapJoiner extends Mapper<LongWritable,Text,Text,Text>
{
static Map<String,String> movies=new HashMap<String,String>();
public void setup(Context context) {
try {
FileReader reader = new FileReader("movies.dat");
BufferedReader br = new BufferedReader(reader);
String s1 = null;
while ((s1 = br.readLine()) != null)
{
System.out.println(s1);
String[] splits= s1.split("::");
String movieId=splits[];
String movieName =splits[1];
movies.put(movieId, movieName);
}
br.close();
reader.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private Text outKey=new Text();
private Text outVal=new Text();
public void map(LongWritable key,Text value,Context context)throws IOException, InterruptedException
{
if(value!=null||value.toString()!=null)
{
String[] splits = value.toString().split("::");
String movieId =splits[1];
String movieName= movies.get(movieId);
outKey.set(movieId);
outVal.set(movieName+"::"+value.toString());
context.write(outKey, outVal);
}
}
}
public static class DirectReducer extends Reducer<Text,Text,NullWritable,Text>
{
NullWritable outKey=NullWritable.get();
public void reduce(Text key,Iterable<Text> values,Context context)throws IOException, InterruptedException
{
for(Text value :values)
{
context.write(outKey, value);
}
}
}
public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException, ClassNotFoundException {
Configuration conf =new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
DistributedCache.createSymlink(conf);
DistributedCache.addCacheFile(new URI("hdfs://mylinux:9000/data/exam/movie/movies.dat#movies.dat"), conf);
Job job=new Job(conf);
job.setJobName("Join on Map Side");
job.setJarByClass(MapJoinByCache.class);
job.setMapperClass(MapJoiner.class);
job.setReducerClass(DirectReducer.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[]));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? : 1);
}