HDFS FileSystem 源码分析

简介:

简介

FileSystem 是一个相当通用的文件系统的抽象类,负责文件系统相关操作,如:创建目录、创建文件、删除目录或文件、读取文件内容等。
本文主要讲解 HDFS 分布式文件系统,具体实现类为DistributedFileSystem

创建 FileSystem 实例源码分析

下面以FileSystem fs = FileSystem.get(new Configuration());为例进行源码剖析。
进入get方法,源码如下:

/**
  * Returns the configured filesystem implementation.
  * @param conf the configuration to use
  */
public static FileSystem get(Configuration conf) throws IOException {
   return get(getDefaultUri(conf), conf);
}

getDefaultUri方法源码如下:

/** Get the default filesystem URI from a configuration.
 * @param conf the configuration to use
 * @return the uri of the default filesystem
 */
public static URI getDefaultUri(Configuration conf) {
  return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
}

此方法通过配置文件中参数fs.defaultFS指定的值来生成URI对象,如果配置文件中没有指定,则读取默认值为file:///,即本地文件系统。
继续进入get(uri, conf)方法分析源码:

/** Returns the FileSystem for this URI's scheme and authority.  The scheme
 * of the URI determines a configuration property name,
 * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
 * The entire URI is passed to the FileSystem instance's initialize method.
 */
public static FileSystem get(URI uri, Configuration conf) throws IOException {
  String scheme = uri.getScheme();
  String authority = uri.getAuthority();

  if (scheme == null && authority == null) {     // use default FS
    return get(conf);
  }

  if (scheme != null && authority == null) {     // no authority
    URI defaultUri = getDefaultUri(conf);
    if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
        && defaultUri.getAuthority() != null) {  // & default has authority
      return get(defaultUri, conf);              // return default
    }
  }
  
  String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
  if (conf.getBoolean(disableCacheName, false)) {
    return createFileSystem(uri, conf);
  }

  return CACHE.get(uri, conf);
}

22~25行判断是否屏蔽缓存功能,默认情况是开户缓存功能,如果屏蔽缓存功能,则每次都会新创建一个连接,不推荐这样做。
27从CACHE中获取FileSystem实例,下面进入CACHE.get()方法:

FileSystem get(URI uri, Configuration conf) throws IOException{
  Key key = new Key(uri, conf);
  return getInternal(uri, conf, key);
}

进入getInternal方法:

private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
  FileSystem fs;
  synchronized (this) {
    fs = map.get(key);
  }
  if (fs != null) {
    return fs;
  }

  fs = createFileSystem(uri, conf);
  synchronized (this) { // refetch the lock again
    FileSystem oldfs = map.get(key);
    if (oldfs != null) { // a file system is created while lock is releasing
      fs.close(); // close the new file system
      return oldfs;  // return the old file system
    }
    
    // now insert the new file system into the map
    if (map.isEmpty()
            && !ShutdownHookManager.get().isShutdownInProgress()) {
      ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
    }
    fs.key = key;
    map.put(key, fs);
    if (conf.getBoolean("fs.automatic.close", true)) {
      toAutoClose.add(key);
    }
    return fs;
  }
}

此方法的其他逻辑不做分析,核心就是从缓存中获取fs,如果获取为空则通过createFileSystem方法新创建实例,创建成功后将实例放入缓存中。
我们重点看一下createFileSystem方法看源码实现:

private static FileSystem createFileSystem(URI uri, Configuration conf
    ) throws IOException {
  Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
  if (clazz == null) {
    throw new IOException("No FileSystem for scheme: " + uri.getScheme());
  }
  FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
  fs.initialize(uri, conf);
  return fs;
}

通过getFileSystemClass方法获取文件系统Class对象,获取Class对象后通过反射机制创建FileSystem实例,下面看一下getFileSystemClass方法源码实现:

public static Class<? extends FileSystem> getFileSystemClass(String scheme,
    Configuration conf) throws IOException {
  if (!FILE_SYSTEMS_LOADED) {
    loadFileSystems();
  }
  Class<? extends FileSystem> clazz = null;
  if (conf != null) {
    clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
  }
  if (clazz == null) {
    clazz = SERVICE_FILE_SYSTEMS.get(scheme);
  }
  if (clazz == null) {
    throw new IOException("No FileSystem for scheme: " + scheme);
  }
  return clazz;
}

1、首先判断是否已经初始化加载过,如果没有,则调用loadFileSystems方法初始所有文件系统,并缓存。
2、优先从配置文件中指定的文件系统,通过fs.[scheme].impl参数指定。
3、如果配置文件中没有指定,则通过 scheme 直接从缓存中获取。

回到createFileSystem方法源码,通过反射实例化fs对象后,调用initialize方法做初始化工作,下面看一下DistributedFileSystem.initialize方法的实现:

@Override
public void initialize(URI uri, Configuration conf) throws IOException {
  super.initialize(uri, conf);
  setConf(conf);

  String host = uri.getHost();
  if (host == null) {
    throw new IOException("Incomplete HDFS URI, no host: "+ uri);
  }
  homeDirPrefix = conf.get(
      DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
      DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
  
  this.dfs = new DFSClient(uri, conf, statistics);
  this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
  this.workingDir = getHomeDirectory();
}

此方法最核心的代码是第14行初始化DFSClient类,所有文件系统相关操作全部在此类中实现,到此整个FileSystem实例初始化完成。

目录
相关文章
|
IDE 大数据 Java
大数据问题排查系列 - HDFS FileSystem API 的正确打开方式,你 GET 了吗?
大数据问题排查系列 - HDFS FileSystem API 的正确打开方式,你 GET 了吗?
|
5月前
|
存储 分布式计算 Hadoop
Hadoop Distributed File System (HDFS): 概念、功能点及实战
【6月更文挑战第12天】Hadoop Distributed File System (HDFS) 是 Hadoop 生态系统中的核心组件之一。它设计用于在大规模集群环境中存储和管理海量数据,提供高吞吐量的数据访问和容错能力。
615 4
|
存储 缓存 分布式计算
|
存储 机器学习/深度学习 分布式计算
|
监控
HDFS源码分析EditLog之获取编辑日志输入流
        在《HDFS源码分析之EditLogTailer》一文中,我们详细了解了编辑日志跟踪器EditLogTailer的实现,介绍了其内部编辑日志追踪线程EditLogTailerThread的实现,及其线程完成编辑日志跟踪所依赖的最重要的方法,执行日志追踪的doTailEdits()方法。
1084 0
|
缓存
HDFS源码分析EditLog之读取操作符
        在《HDFS源码分析EditLog之获取编辑日志输入流》一文中,我们详细了解了如何获取编辑日志输入流EditLogInputStream。在我们得到编辑日志输入流后,是不是就该从输入流中获取数据来处理呢?答案是显而易见的!在《HDFS源码分析之EditLogTailer》一文...
1436 0
HDFS源码分析之编辑日志编辑相关双缓冲区EditsDoubleBuffer
        EditsDoubleBuffer是为edits准备的双缓冲区。新的编辑被写入第一个缓冲区,同时第二个缓冲区可以被flush。为edits准备的双缓冲区。新的编辑被写入第一个缓冲区,同时第二个缓冲区可以被flush。
1026 0