diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f9ed841355..0e5147011e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -107,6 +107,7 @@ Release 0.23.1 - UNRELEASED NEW FEATURES IMPROVEMENTS + HDFS-2560. Refactor BPOfferService to be a static inner class (todd) OPTIMIZATIONS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java index 2cb8b41ffd..05fba79fe6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java @@ -55,6 +55,10 @@ synchronized BlockTokenSecretManager get(String bpid) { } return secretMgr; } + + public synchronized boolean isBlockPoolRegistered(String bpid) { + return map.containsKey(bpid); + } /** Return an empty BlockTokenIdentifer */ @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index d496c6a2cc..32fe56c63f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -136,6 +136,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; @@ -275,7 +276,7 @@ class BlockPoolManager { List isas = DFSUtil.getNNServiceRpcAddresses(conf); for(InetSocketAddress isa : isas) { - BPOfferService bpos = new BPOfferService(isa); + BPOfferService bpos = new BPOfferService(isa, DataNode.this); nameNodeThreads.put(bpos.getNNSocketAddress(), bpos); } } @@ -373,19 +374,19 @@ void refreshNamenodes(Configuration conf) } for (InetSocketAddress nnaddr : toStart) { - BPOfferService bpos = new BPOfferService(nnaddr); + BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this); nameNodeThreads.put(bpos.getNNSocketAddress(), bpos); } - - for (BPOfferService bpos : toShutdown) { - remove(bpos); - } } for (BPOfferService bpos : toShutdown) { bpos.stop(); bpos.join(); } + + // stoping the BPOSes causes them to call remove() on their own when they + // clean up. + // Now start the threads that are not already running. startAll(); } @@ -402,9 +403,7 @@ void refreshNamenodes(Configuration conf) Daemon dataXceiverServer = null; ThreadGroup threadGroup = null; long blockReportInterval; - boolean resetBlockReportTime = true; long deleteReportInterval; - long lastDeletedReport = 0; long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L; long heartBeatInterval; private boolean heartbeatsDisabledForTests = false; @@ -653,6 +652,7 @@ private synchronized void initDataBlockScanner(Configuration conf) { return; } String reason = null; + assert data != null; if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) { reason = "verification is turned off by configuration"; @@ -774,11 +774,15 @@ void setHeartbeatsDisabledForTests( * */ @InterfaceAudience.Private - class BPOfferService implements Runnable { + static class BPOfferService implements Runnable { final InetSocketAddress nnAddr; DatanodeRegistration bpRegistration; NamespaceInfo bpNSInfo; long lastBlockReport = 0; + long lastDeletedReport = 0; + + boolean resetBlockReportTime = true; + private Thread bpThread; private DatanodeProtocol bpNamenode; private String blockPoolId; @@ -788,14 +792,13 @@ class BPOfferService implements Runnable { = new LinkedList(); private volatile int pendingReceivedRequests = 0; private volatile boolean shouldServiceRun = true; - private boolean isBlockTokenInitialized = false; UpgradeManagerDatanode upgradeManager = null; + private final DataNode dn; - BPOfferService(InetSocketAddress isa) { - this.bpRegistration = new DatanodeRegistration(getMachineName()); - bpRegistration.setInfoPort(infoServer.getPort()); - bpRegistration.setIpcPort(getIpcPort()); - this.nnAddr = isa; + BPOfferService(InetSocketAddress nnAddr, DataNode dn) { + this.dn = dn; + this.bpRegistration = dn.createRegistration(); + this.nnAddr = nnAddr; } /** @@ -822,7 +825,6 @@ private InetSocketAddress getNNSocketAddress() { void setNamespaceInfo(NamespaceInfo nsinfo) { bpNSInfo = nsinfo; this.blockPoolId = nsinfo.getBlockPoolID(); - blockPoolManager.addBlockPool(this); } void setNameNode(DatanodeProtocol dnProtocol) { @@ -831,7 +833,7 @@ void setNameNode(DatanodeProtocol dnProtocol) { private NamespaceInfo handshake() throws IOException { NamespaceInfo nsInfo = new NamespaceInfo(); - while (shouldRun && shouldServiceRun) { + while (dn.shouldRun && shouldServiceRun) { try { nsInfo = bpNamenode.versionRequest(); // verify build version @@ -867,7 +869,7 @@ private NamespaceInfo handshake() throws IOException { return nsInfo; } - void setupBP(Configuration conf, AbstractList dataDirs) + void setupBP(Configuration conf) throws IOException { // get NN proxy DatanodeProtocol dnp = @@ -878,52 +880,19 @@ void setupBP(Configuration conf, AbstractList dataDirs) // handshake with NN NamespaceInfo nsInfo = handshake(); setNamespaceInfo(nsInfo); - synchronized(DataNode.this) { - // we do not allow namenode from different cluster to register - if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) { - throw new IOException( - "cannot register with the namenode because clusterid do not match:" - + " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID + - ";dn cid=" + clusterId); - } - - setupBPStorage(); - - setClusterId(nsInfo.clusterID); - } - - initPeriodicScanners(conf); - } - - void setupBPStorage() throws IOException { - StartupOption startOpt = getStartupOption(conf); - assert startOpt != null : "Startup option must be set."; - - boolean simulatedFSDataset = conf.getBoolean( - DFS_DATANODE_SIMULATEDDATASTORAGE_KEY, - DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT); + dn.initBlockPool(this, nsInfo); - if (simulatedFSDataset) { - initFsDataSet(conf, dataDirs); - bpRegistration.setStorageID(getStorageId()); //same as DN + bpRegistration.setStorageID(dn.getStorageId()); + StorageInfo storageInfo = dn.storage.getBPStorage(blockPoolId); + if (storageInfo == null) { + // it's null in the case of SimulatedDataSet bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION; - bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID; - bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID; + bpRegistration.setStorageInfo(nsInfo); } else { - // read storage info, lock data dirs and transition fs state if necessary - storage.recoverTransitionRead(DataNode.this, blockPoolId, bpNSInfo, - dataDirs, startOpt); - LOG.info("setting up storage: nsid=" + storage.namespaceID + ";bpid=" - + blockPoolId + ";lv=" + storage.layoutVersion + ";nsInfo=" - + bpNSInfo); - - bpRegistration.setStorageID(getStorageId()); - bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId)); - initFsDataSet(conf, dataDirs); + bpRegistration.setStorageInfo(storageInfo); } - data.addBlockPool(blockPoolId, conf); } - + /** * This methods arranges for the data node to send the block report at * the next heartbeat. @@ -931,9 +900,9 @@ void setupBPStorage() throws IOException { void scheduleBlockReport(long delay) { if (delay > 0) { // send BR after random delay lastBlockReport = System.currentTimeMillis() - - ( blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); + - ( dn.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); } else { // send at next heartbeat - lastBlockReport = lastHeartbeat - blockReportInterval; + lastBlockReport = lastHeartbeat - dn.blockReportInterval; } resetBlockReportTime = true; // reset future BRs for randomness } @@ -1038,11 +1007,11 @@ DatanodeCommand blockReport() throws IOException { // send block report if timer has expired. DatanodeCommand cmd = null; long startTime = now(); - if (startTime - lastBlockReport > blockReportInterval) { + if (startTime - lastBlockReport > dn.blockReportInterval) { // Create block report long brCreateStartTime = now(); - BlockListAsLongs bReport = data.getBlockReport(blockPoolId); + BlockListAsLongs bReport = dn.data.getBlockReport(blockPoolId); // Send block report long brSendStartTime = now(); @@ -1052,7 +1021,7 @@ DatanodeCommand blockReport() throws IOException { // Log the block report processing stats from Datanode perspective long brSendCost = now() - brSendStartTime; long brCreateCost = brSendStartTime - brCreateStartTime; - metrics.addBlockReport(brSendCost); + dn.metrics.addBlockReport(brSendCost); LOG.info("BlockReport of " + bReport.getNumberOfBlocks() + " blocks took " + brCreateCost + " msec to generate and " + brSendCost + " msecs for RPC and NN processing"); @@ -1060,7 +1029,7 @@ DatanodeCommand blockReport() throws IOException { // If we have sent the first block report, then wait a random // time before we start the periodic block reports. if (resetBlockReportTime) { - lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(blockReportInterval)); + lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dn.blockReportInterval)); resetBlockReportTime = false; } else { /* say the last block report was at 8:20:14. The current report @@ -1070,7 +1039,7 @@ DatanodeCommand blockReport() throws IOException { * 2) unexpected like 11:35:43, next report should be at 12:20:14 */ lastBlockReport += (now() - lastBlockReport) / - blockReportInterval * blockReportInterval; + dn.blockReportInterval * dn.blockReportInterval; } LOG.info("sent block report, processed command:" + cmd); } @@ -1080,12 +1049,12 @@ DatanodeCommand blockReport() throws IOException { DatanodeCommand [] sendHeartBeat() throws IOException { return bpNamenode.sendHeartbeat(bpRegistration, - data.getCapacity(), - data.getDfsUsed(), - data.getRemaining(), - data.getBlockPoolUsed(blockPoolId), - xmitsInProgress.get(), - getXceiverCount(), data.getNumFailedVolumes()); + dn.data.getCapacity(), + dn.data.getDfsUsed(), + dn.data.getRemaining(), + dn.data.getBlockPoolUsed(blockPoolId), + dn.xmitsInProgress.get(), + dn.getXceiverCount(), dn.data.getNumFailedVolumes()); } //This must be called only by blockPoolManager @@ -1121,21 +1090,9 @@ private synchronized void cleanUp() { if(upgradeManager != null) upgradeManager.shutdownUpgrade(); - - blockPoolManager.remove(this); shouldServiceRun = false; RPC.stopProxy(bpNamenode); - if (blockScanner != null) { - blockScanner.removeBlockPool(this.getBlockPoolId()); - } - - if (data != null) { - data.shutdownBlockPool(this.getBlockPoolId()); - } - - if (storage != null) { - storage.removeBlockPoolStorage(this.getBlockPoolId()); - } + dn.shutdownBlockPool(this); } /** @@ -1144,22 +1101,22 @@ private synchronized void cleanUp() { */ private void offerService() throws Exception { LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of " - + deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of " - + blockReportInterval + "msec" + " Initial delay: " - + initialBlockReportDelay + "msec" + "; heartBeatInterval=" - + heartBeatInterval); + + dn.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of " + + dn.blockReportInterval + "msec" + " Initial delay: " + + dn.initialBlockReportDelay + "msec" + "; heartBeatInterval=" + + dn.heartBeatInterval); // // Now loop for a long time.... // - while (shouldRun && shouldServiceRun) { + while (dn.shouldRun && shouldServiceRun) { try { long startTime = now(); // // Every so often, send heartbeat or block-report // - if (startTime - lastHeartbeat > heartBeatInterval) { + if (startTime - lastHeartbeat > dn.heartBeatInterval) { // // All heartbeat messages include following info: // -- Datanode name @@ -1168,9 +1125,9 @@ private void offerService() throws Exception { // -- Bytes remaining // lastHeartbeat = startTime; - if (!heartbeatsDisabledForTests) { + if (!dn.heartbeatsDisabledForTests) { DatanodeCommand[] cmds = sendHeartBeat(); - metrics.addHeartbeat(now() - startTime); + dn.metrics.addHeartbeat(now() - startTime); long startProcessCommands = now(); if (!processCommand(cmds)) @@ -1183,7 +1140,7 @@ private void offerService() throws Exception { } } if (pendingReceivedRequests > 0 - || (startTime - lastDeletedReport > deleteReportInterval)) { + || (startTime - lastDeletedReport > dn.deleteReportInterval)) { reportReceivedDeletedBlocks(); lastDeletedReport = startTime; } @@ -1192,15 +1149,15 @@ private void offerService() throws Exception { processCommand(cmd); // Now safe to start scanning the block pool - if (blockScanner != null) { - blockScanner.addBlockPool(this.blockPoolId); + if (dn.blockScanner != null) { + dn.blockScanner.addBlockPool(this.blockPoolId); } // // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. // - long waitTime = heartBeatInterval - + long waitTime = dn.heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat); synchronized(receivedAndDeletedBlockList) { if (waitTime > 0 && pendingReceivedRequests == 0) { @@ -1223,7 +1180,7 @@ private void offerService() throws Exception { } LOG.warn("RemoteException in offerService", re); try { - long sleepTime = Math.min(1000, heartBeatInterval); + long sleepTime = Math.min(1000, dn.heartBeatInterval); Thread.sleep(sleepTime); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -1269,7 +1226,7 @@ void register() throws IOException { (bpNSInfo.getLayoutVersion(), "namenode"); } - while(shouldRun && shouldServiceRun) { + while(dn.shouldRun && shouldServiceRun) { try { // Use returned registration from namenode with updated machine name. bpRegistration = bpNamenode.registerDatanode(bpRegistration); @@ -1277,8 +1234,6 @@ void register() throws IOException { LOG.info("bpReg after =" + bpRegistration.storageInfo + ";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName()); - NetUtils.getHostname(); - hostName = bpRegistration.getHost(); break; } catch(SocketTimeoutException e) { // namenode is busy LOG.info("Problem connecting to server: " + nnAddr); @@ -1287,47 +1242,13 @@ void register() throws IOException { } catch (InterruptedException ie) {} } } - - if (storage.getStorageID().equals("")) { - storage.setStorageID(bpRegistration.getStorageID()); - storage.writeAll(); - LOG.info("New storage id " + bpRegistration.getStorageID() - + " is assigned to data-node " + bpRegistration.getName()); - } else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) { - throw new IOException("Inconsistent storage IDs. Name-node returned " - + bpRegistration.getStorageID() - + ". Expecting " + storage.getStorageID()); - } - - if (!isBlockTokenInitialized) { - /* first time registering with NN */ - ExportedBlockKeys keys = bpRegistration.exportedKeys; - isBlockTokenEnabled = keys.isBlockTokenEnabled(); - if (isBlockTokenEnabled) { - long blockKeyUpdateInterval = keys.getKeyUpdateInterval(); - long blockTokenLifetime = keys.getTokenLifetime(); - LOG.info("Block token params received from NN: for block pool " + - blockPoolId + " keyUpdateInterval=" - + blockKeyUpdateInterval / (60 * 1000) - + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000) - + " min(s)"); - final BlockTokenSecretManager secretMgr = - new BlockTokenSecretManager(false, 0, blockTokenLifetime); - blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr); - } - isBlockTokenInitialized = true; - } - - if (isBlockTokenEnabled) { - blockPoolTokenSecretManager.setKeys(blockPoolId, - bpRegistration.exportedKeys); - bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS; - } + + dn.bpRegistrationSucceeded(bpRegistration, blockPoolId); LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo); // random short delay - helps scatter the BR from all DNs - scheduleBlockReport(initialBlockReportDelay); + scheduleBlockReport(dn.initialBlockReportDelay); } @@ -1341,14 +1262,14 @@ void register() throws IOException { */ @Override public void run() { - LOG.info(bpRegistration + "In BPOfferService.run, data = " + data + LOG.info(bpRegistration + "In BPOfferService.run, data = " + dn.data + ";bp=" + blockPoolId); try { // init stuff try { // setup storage - setupBP(conf, dataDirs); + setupBP(dn.conf); register(); } catch (IOException ioe) { // Initial handshake, storage recovery or registration failed @@ -1360,13 +1281,13 @@ public void run() { initialized = true; // bp is initialized; - while (shouldRun && shouldServiceRun) { + while (dn.shouldRun && shouldServiceRun) { try { startDistributedUpgradeIfNeeded(); offerService(); } catch (Exception ex) { LOG.error("Exception in BPOfferService", ex); - if (shouldRun && shouldServiceRun) { + if (dn.shouldRun && shouldServiceRun) { try { Thread.sleep(5000); } catch (InterruptedException ie) { @@ -1379,7 +1300,7 @@ public void run() { LOG.warn("Unexpected exception", ex); } finally { LOG.warn(bpRegistration + " ending block pool service for: " - + blockPoolId); + + blockPoolId + " thread " + Thread.currentThread().getId()); cleanUp(); } } @@ -1420,8 +1341,8 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException { switch(cmd.getAction()) { case DatanodeProtocol.DNA_TRANSFER: // Send a copy of a block to another datanode - transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets()); - metrics.incrBlocksReplicated(bcmd.getBlocks().length); + dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets()); + dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length); break; case DatanodeProtocol.DNA_INVALIDATE: // @@ -1430,16 +1351,16 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException { // Block toDelete[] = bcmd.getBlocks(); try { - if (blockScanner != null) { - blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete); + if (dn.blockScanner != null) { + dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete); } // using global fsdataset - data.invalidate(bcmd.getBlockPoolId(), toDelete); + dn.data.invalidate(bcmd.getBlockPoolId(), toDelete); } catch(IOException e) { - checkDiskError(); + dn.checkDiskError(); throw e; } - metrics.incrBlocksRemoved(toDelete.length); + dn.metrics.incrBlocksRemoved(toDelete.length); break; case DatanodeProtocol.DNA_SHUTDOWN: // shut down the data node @@ -1448,12 +1369,12 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException { case DatanodeProtocol.DNA_REGISTER: // namenode requested a registration - at start or if NN lost contact LOG.info("DatanodeCommand action: DNA_REGISTER"); - if (shouldRun && shouldServiceRun) { + if (dn.shouldRun && shouldServiceRun) { register(); } break; case DatanodeProtocol.DNA_FINALIZE: - storage.finalizeUpgrade(((FinalizeCommand) cmd) + dn.storage.finalizeUpgrade(((FinalizeCommand) cmd) .getBlockPoolId()); break; case UpgradeCommand.UC_ACTION_START_UPGRADE: @@ -1461,12 +1382,12 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException { processDistributedUpgradeCommand((UpgradeCommand)cmd); break; case DatanodeProtocol.DNA_RECOVERBLOCK: - recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks()); + dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks()); break; case DatanodeProtocol.DNA_ACCESSKEYUPDATE: LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE"); - if (isBlockTokenEnabled) { - blockPoolTokenSecretManager.setKeys(blockPoolId, + if (dn.isBlockTokenEnabled) { + dn.blockPoolTokenSecretManager.setKeys(blockPoolId, ((KeyUpdateCommand) cmd).getExportedKeys()); } break; @@ -1476,7 +1397,7 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException { ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue(); if (bandwidth > 0) { DataXceiverServer dxcs = - (DataXceiverServer) dataXceiverServer.getRunnable(); + (DataXceiverServer) dn.dataXceiverServer.getRunnable(); dxcs.balanceThrottler.setBandwidth(bandwidth); } break; @@ -1495,7 +1416,7 @@ private void processDistributedUpgradeCommand(UpgradeCommand comm) synchronized UpgradeManagerDatanode getUpgradeManager() { if(upgradeManager == null) upgradeManager = - new UpgradeManagerDatanode(DataNode.this, blockPoolId); + new UpgradeManagerDatanode(dn, blockPoolId); return upgradeManager; } @@ -1555,6 +1476,133 @@ void startDataNode(Configuration conf, blockPoolManager = new BlockPoolManager(conf); } + /** + * Check that the registration returned from a NameNode is consistent + * with the information in the storage. If the storage is fresh/unformatted, + * sets the storage ID based on this registration. + * Also updates the block pool's state in the secret manager. + */ + private synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration, + String blockPoolId) + throws IOException { + hostName = bpRegistration.getHost(); + + if (storage.getStorageID().equals("")) { + // This is a fresh datanode -- take the storage ID provided by the + // NN and persist it. + storage.setStorageID(bpRegistration.getStorageID()); + storage.writeAll(); + LOG.info("New storage id " + bpRegistration.getStorageID() + + " is assigned to data-node " + bpRegistration.getName()); + } else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) { + throw new IOException("Inconsistent storage IDs. Name-node returned " + + bpRegistration.getStorageID() + + ". Expecting " + storage.getStorageID()); + } + + registerBlockPoolWithSecretManager(bpRegistration, blockPoolId); + } + + /** + * After the block pool has contacted the NN, registers that block pool + * with the secret manager, updating it with the secrets provided by the NN. + * @param bpRegistration + * @param blockPoolId + * @throws IOException + */ + private void registerBlockPoolWithSecretManager(DatanodeRegistration bpRegistration, + String blockPoolId) throws IOException { + ExportedBlockKeys keys = bpRegistration.exportedKeys; + isBlockTokenEnabled = keys.isBlockTokenEnabled(); + // TODO should we check that all federated nns are either enabled or + // disabled? + if (!isBlockTokenEnabled) return; + + if (!blockPoolTokenSecretManager.isBlockPoolRegistered(blockPoolId)) { + long blockKeyUpdateInterval = keys.getKeyUpdateInterval(); + long blockTokenLifetime = keys.getTokenLifetime(); + LOG.info("Block token params received from NN: for block pool " + + blockPoolId + " keyUpdateInterval=" + + blockKeyUpdateInterval / (60 * 1000) + + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000) + + " min(s)"); + final BlockTokenSecretManager secretMgr = + new BlockTokenSecretManager(false, 0, blockTokenLifetime); + blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr); + } + + blockPoolTokenSecretManager.setKeys(blockPoolId, + bpRegistration.exportedKeys); + bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS; + } + + /** + * Remove the given block pool from the block scanner, dataset, and storage. + */ + private void shutdownBlockPool(BPOfferService bpos) { + blockPoolManager.remove(bpos); + + String bpId = bpos.getBlockPoolId(); + if (blockScanner != null) { + blockScanner.removeBlockPool(bpId); + } + + if (data != null) { + data.shutdownBlockPool(bpId); + } + + if (storage != null) { + storage.removeBlockPoolStorage(bpId); + } + } + + void initBlockPool(BPOfferService bpOfferService, + NamespaceInfo nsInfo) throws IOException { + String blockPoolId = nsInfo.getBlockPoolID(); + + blockPoolManager.addBlockPool(bpOfferService); + + synchronized (this) { + // we do not allow namenode from different cluster to register + if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) { + throw new IOException( + "cannot register with the namenode because clusterid do not match:" + + " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID + + ";dn cid=" + clusterId); + } + + setClusterId(nsInfo.clusterID); + } + + StartupOption startOpt = getStartupOption(conf); + assert startOpt != null : "Startup option must be set."; + + boolean simulatedFSDataset = conf.getBoolean( + DFS_DATANODE_SIMULATEDDATASTORAGE_KEY, + DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT); + + if (!simulatedFSDataset) { + // read storage info, lock data dirs and transition fs state if necessary + storage.recoverTransitionRead(DataNode.this, blockPoolId, nsInfo, + dataDirs, startOpt); + StorageInfo bpStorage = storage.getBPStorage(blockPoolId); + LOG.info("setting up storage: nsid=" + + bpStorage.getNamespaceID() + ";bpid=" + + blockPoolId + ";lv=" + storage.getLayoutVersion() + + ";nsInfo=" + nsInfo); + } + initFsDataSet(); + initPeriodicScanners(conf); + data.addBlockPool(nsInfo.getBlockPoolID(), conf); + } + + private DatanodeRegistration createRegistration() { + DatanodeRegistration reg = new DatanodeRegistration(getMachineName()); + reg.setInfoPort(infoServer.getPort()); + reg.setIpcPort(getIpcPort()); + return reg; + } + BPOfferService[] getAllBpOs() { return blockPoolManager.getAllNamenodeThreads(); } @@ -1567,8 +1615,7 @@ int getBpOsCount() { * Initializes the {@link #data}. The initialization is done only once, when * handshake with the the first namenode is completed. */ - private synchronized void initFsDataSet(Configuration conf, - AbstractList dataDirs) throws IOException { + private synchronized void initFsDataSet() throws IOException { if (data != null) { // Already initialized return; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index 4d7740455f..a3d47b623e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -61,7 +61,7 @@ public static void setBPNamenodeByIndex(DataNode dn, bpos.setNamespaceInfo(nsifno); dn.setBPNamenode(bpid, nn); - bpos.setupBPStorage(); + dn.initBlockPool(bpos, nsifno); } } }