hadoop_book/hdfs/dataNode启动过程.md

8.0 KiB
Raw Blame History

启动

createDataNode

DataNode核心启动类是从createDataNode开始的。

public static DataNode createDataNode(String args[], Configuration conf,
    SecureResources resources) throws IOException {
  // 初始化DataNode,核心函数在makeInstance里面的new DataNode()进行初始化。
  DataNode dn = instantiateDataNode(args, conf, resources);
  if (dn != null) {
    // 启动DataNode
    dn.runDatanodeDaemon();
  }
  return dn;
}

makeInstance

static DataNode makeInstance(Collection<StorageLocation> dataDirs,
    Configuration conf, SecureResources resources) throws IOException {
  List<StorageLocation> locations;
  StorageLocationChecker storageLocationChecker =
      new StorageLocationChecker(conf, new Timer());
  try {
    // 对存储路径进行检查。返回可用的位置。
    locations = storageLocationChecker.check(conf, dataDirs);
  } catch (InterruptedException ie) {
    throw new IOException("Failed to instantiate DataNode", ie);
  }
  DefaultMetricsSystem.initialize("DataNode");

  assert locations.size() > 0 : "number of data directories should be > 0";
  // 初始化DataNode实例。在构造函数里面startDataNode比较重要是启动DataNode的核心函数需要重点关注。
  return new DataNode(conf, locations, storageLocationChecker, resources);
}

startDataNode

void startDataNode(List<StorageLocation> dataDirectories,
                   SecureResources resources
                   ) throws IOException {

  // settings global for all BPs in the Data Node
  this.secureResources = resources;
  synchronized (this) {
    this.dataDirs = dataDirectories;
  }
  this.dnConf = new DNConf(this);
  checkSecureConfig(dnConf, getConf(), resources);
  // ... 省略 ....

  storage = new DataStorage();
  
  // global DN settings
  registerMXBean();
  // 初始化dataXceiver服务用于DN接受客户端和其他DN发送过来的数据的服务。
  initDataXceiver();
  // 初始化HTTP服务。
  startInfoServer();
  //TODO 待定。
  pauseMonitor = new JvmPauseMonitor();
  pauseMonitor.init(getConf());
  pauseMonitor.start();

  // BlockPoolTokenSecretManager is required to create ipc server.
  this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();

  // Login is done by now. Set the DN user name.
  dnUserName = UserGroupInformation.getCurrentUser().getUserName();
  LOG.info("dnUserName = {}", dnUserName);
  LOG.info("supergroup = {}", supergroup);
  // 初始化DN的RPC调用服务。
  initIpcServer();

  metrics = DataNodeMetrics.create(getConf(), getDisplayName());
  peerMetrics = dnConf.peerStatsEnabled ?
      DataNodePeerMetrics.create(getDisplayName(), getConf()) : null;
  metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

  ecWorker = new ErasureCodingWorker(getConf(), this);
  blockRecoveryWorker = new BlockRecoveryWorker(this);

  blockPoolManager = new BlockPoolManager(this);
  // DN向NN注册其中核心的函数为doRefreshNamenodes
  blockPoolManager.refreshNamenodes(getConf());

  // Create the ReadaheadPool from the DataNode context so we can
  // exit without having to explicitly shutdown its thread pool.
  readaheadPool = ReadaheadPool.getInstance();
  saslClient = new SaslDataTransferClient(dnConf.getConf(),
      dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
  saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
  startMetricsLogger();

  if (dnConf.diskStatsEnabled) {
    diskMetrics = new DataNodeDiskMetrics(this,
        dnConf.outliersReportIntervalMs, getConf());
  }
}

doRefreshNamenodes

主要是向NN注册当前DN。

private void doRefreshNamenodes(
    Map<String, Map<String, InetSocketAddress>> addrMap,
    Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
    throws IOException {

  Set<String> toRefresh = Sets.newLinkedHashSet();
  Set<String> toAdd = Sets.newLinkedHashSet();
  Set<String> toRemove;
  
  synchronized (this) {
    // Step 1. For each of the new nameservices, figure out whether
    // it's an update of the set of NNs for an existing NS,
    // or an entirely new nameservice.
    for (String nameserviceId : addrMap.keySet()) {
      if (bpByNameserviceId.containsKey(nameserviceId)) {
        toRefresh.add(nameserviceId);
      } else {
        toAdd.add(nameserviceId);
      }
    }
    
    // Step 2. Any nameservices we currently have but are no longer present
    // need to be removed.
    toRemove = Sets.newHashSet(Sets.difference(
        bpByNameserviceId.keySet(), addrMap.keySet()));
    
    assert toRefresh.size() + toAdd.size() ==
      addrMap.size() :
        "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
        "  toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
        "  toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);

    
    // Step 3. Start new nameservices
    if (!toAdd.isEmpty()) {
      LOG.info("Starting BPOfferServices for nameservices: " +
          Joiner.on(",").useForNull("<default>").join(toAdd));
    
      for (String nsToAdd : toAdd) {
        Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToAdd);
        Map<String, InetSocketAddress> nnIdToLifelineAddr =
            lifelineAddrMap.get(nsToAdd);
        ArrayList<InetSocketAddress> addrs =
            Lists.newArrayListWithCapacity(nnIdToAddr.size());
        ArrayList<String> nnIds =
            Lists.newArrayListWithCapacity(nnIdToAddr.size());
        ArrayList<InetSocketAddress> lifelineAddrs =
            Lists.newArrayListWithCapacity(nnIdToAddr.size());
        for (String nnId : nnIdToAddr.keySet()) {
          addrs.add(nnIdToAddr.get(nnId));
          nnIds.add(nnId);
          lifelineAddrs.add(nnIdToLifelineAddr != null ?
              nnIdToLifelineAddr.get(nnId) : null);
        }
        BPOfferService bpos = createBPOS(nsToAdd, nnIds, addrs,
            lifelineAddrs);
        bpByNameserviceId.put(nsToAdd, bpos);
        offerServices.add(bpos);
      }
    }
    startAll();
  }

  // Step 4. Shut down old nameservices. This happens outside
  // of the synchronized(this) lock since they need to call
  // back to .remove() from another thread
  if (!toRemove.isEmpty()) {
    LOG.info("Stopping BPOfferServices for nameservices: " +
        Joiner.on(",").useForNull("<default>").join(toRemove));
    
    for (String nsToRemove : toRemove) {
      BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
      bpos.stop();
      bpos.join();
      // they will call remove on their own
    }
  }
  
  // Step 5. Update nameservices whose NN list has changed
  if (!toRefresh.isEmpty()) {
    LOG.info("Refreshing list of NNs for nameservices: " +
        Joiner.on(",").useForNull("<default>").join(toRefresh));
    
    for (String nsToRefresh : toRefresh) {
      BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
      Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToRefresh);
      Map<String, InetSocketAddress> nnIdToLifelineAddr =
          lifelineAddrMap.get(nsToRefresh);
      ArrayList<InetSocketAddress> addrs =
          Lists.newArrayListWithCapacity(nnIdToAddr.size());
      ArrayList<InetSocketAddress> lifelineAddrs =
          Lists.newArrayListWithCapacity(nnIdToAddr.size());
      ArrayList<String> nnIds = Lists.newArrayListWithCapacity(
          nnIdToAddr.size());
      for (String nnId : nnIdToAddr.keySet()) {
        addrs.add(nnIdToAddr.get(nnId));
        lifelineAddrs.add(nnIdToLifelineAddr != null ?
            nnIdToLifelineAddr.get(nnId) : null);
        nnIds.add(nnId);
      }
      try {
        UserGroupInformation.getLoginUser()
            .doAs(new PrivilegedExceptionAction<Object>() {
              @Override
              public Object run() throws Exception {
                bpos.refreshNNList(nsToRefresh, nnIds, addrs, lifelineAddrs);
                return null;
              }
            });
      } catch (InterruptedException ex) {
        IOException ioe = new IOException();
        ioe.initCause(ex.getCause());
        throw ioe;
      }
    }
  }
}