diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index e2975e85cf..414b28e908 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -29,3 +29,5 @@ HDFS-2582. Scope dfs.ha.namenodes config by nameservice (todd) HDFS-2591. MiniDFSCluster support to mix and match federation with HA (todd) HDFS-1975. Support for sharing the namenode state from active to standby. (jitendra, atm, todd) + +HDFS-1971. Send block report from datanode to both active and standby namenodes. (sanjay, todd via suresh) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 4d098ebec2..85807f6d5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -17,62 +17,43 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static org.apache.hadoop.hdfs.server.common.Util.now; - import java.io.IOException; import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.net.URI; -import java.util.Collection; -import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; -import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; -import org.apache.hadoop.hdfs.server.common.Storage; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** - * A thread per namenode to perform: - * + * One instance per block-pool/namespace on the DN, which handles the + * heartbeats to the active and standby NNs for that namespace. + * This class manages an instance of {@link BPServiceActor} for each NN, + * and delegates calls to both NNs. + * It also maintains the state about which of the NNs is considered active. */ @InterfaceAudience.Private -class BPOfferService implements Runnable { +class BPOfferService { static final Log LOG = DataNode.LOG; - final InetSocketAddress nnAddr; - /** * Information about the namespace that this service * is registering with. This is assigned after @@ -87,27 +68,25 @@ class BPOfferService implements Runnable { */ DatanodeRegistration bpRegistration; - long lastBlockReport = 0; - long lastDeletedReport = 0; - - boolean resetBlockReportTime = true; - - Thread bpThread; - DatanodeProtocol bpNamenode; - private long lastHeartbeat = 0; - private volatile boolean initialized = false; - private final LinkedList receivedAndDeletedBlockList - = new LinkedList(); - private volatile int pendingReceivedRequests = 0; - private volatile boolean shouldServiceRun = true; UpgradeManagerDatanode upgradeManager = null; private final DataNode dn; - private final DNConf dnConf; - BPOfferService(InetSocketAddress nnAddr, DataNode dn) { + private BPServiceActor bpServiceToActive; + private List bpServices = + new CopyOnWriteArrayList(); + + BPOfferService(List nnAddrs, DataNode dn) { + Preconditions.checkArgument(!nnAddrs.isEmpty(), + "Must pass at least one NN."); this.dn = dn; - this.nnAddr = nnAddr; - this.dnConf = dn.getDnConf(); + + for (InetSocketAddress addr : nnAddrs) { + this.bpServices.add(new BPServiceActor(addr, this)); + } + // TODO(HA): currently we just make the first one the initial + // active. In reality it should start in an unknown state and then + // as we figure out which is active, designate one as such. + this.bpServiceToActive = this.bpServices.get(0); } /** @@ -115,15 +94,18 @@ class BPOfferService implements Runnable { * and has registered with the corresponding namenode * @return true if initialized */ - public boolean isInitialized() { - return initialized; + boolean isInitialized() { + // TODO(HA) is this right? + return bpServiceToActive != null && bpServiceToActive.isInitialized(); } - public boolean isAlive() { - return shouldServiceRun && bpThread.isAlive(); + boolean isAlive() { + // TODO: should || all the bp actors probably? + return bpServiceToActive != null && + bpServiceToActive.isAlive(); } - public String getBlockPoolId() { + String getBlockPoolId() { if (bpNSInfo != null) { return bpNSInfo.getBlockPoolID(); } else { @@ -133,10 +115,11 @@ public String getBlockPoolId() { } } - public NamespaceInfo getNamespaceInfo() { + NamespaceInfo getNamespaceInfo() { return bpNSInfo; } + @Override public String toString() { if (bpNSInfo == null) { // If we haven't yet connected to our NN, we don't yet know our @@ -148,519 +131,279 @@ public String toString() { storageId = "unknown"; } return "Block pool (storage id " + storageId + - ") connecting to " + nnAddr; + ")"; } else { return "Block pool " + getBlockPoolId() + " (storage id " + dn.getStorageId() + - ") registered with " + nnAddr; + ")"; } } - InetSocketAddress getNNSocketAddress() { - return nnAddr; - } - - /** - * Used to inject a spy NN in the unit tests. - */ - @VisibleForTesting - void setNameNode(DatanodeProtocol dnProtocol) { - bpNamenode = dnProtocol; - } - - /** - * Perform the first part of the handshake with the NameNode. - * This calls versionRequest to determine the NN's - * namespace and version info. It automatically retries until - * the NN responds or the DN is shutting down. - * - * @return the NamespaceInfo - * @throws IncorrectVersionException if the remote NN does not match - * this DN's version - */ - NamespaceInfo retrieveNamespaceInfo() throws IncorrectVersionException { - NamespaceInfo nsInfo = null; - while (shouldRun()) { - try { - nsInfo = bpNamenode.versionRequest(); - LOG.debug(this + " received versionRequest response: " + nsInfo); - break; - } catch(SocketTimeoutException e) { // namenode is busy - LOG.warn("Problem connecting to server: " + nnAddr); - } catch(IOException e ) { // namenode is not available - LOG.warn("Problem connecting to server: " + nnAddr); - } - - // try again in a second - sleepAndLogInterrupts(5000, "requesting version info from NN"); - } - - if (nsInfo != null) { - checkNNVersion(nsInfo); - } - return nsInfo; - } - - private void checkNNVersion(NamespaceInfo nsInfo) - throws IncorrectVersionException { - // build and layout versions should match - String nsBuildVer = nsInfo.getBuildVersion(); - String stBuildVer = Storage.getBuildVersion(); - if (!nsBuildVer.equals(stBuildVer)) { - LOG.warn("Data-node and name-node Build versions must be the same. " + - "Namenode build version: " + nsBuildVer + "Datanode " + - "build version: " + stBuildVer); - throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer); - } - - if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) { - LOG.warn("Data-node and name-node layout versions must be the same." + - " Expected: "+ HdfsConstants.LAYOUT_VERSION + - " actual "+ bpNSInfo.getLayoutVersion()); - throw new IncorrectVersionException( - bpNSInfo.getLayoutVersion(), "namenode"); - } - } - - private void connectToNNAndHandshake() throws IOException { - // get NN proxy - bpNamenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class, - DatanodeProtocol.versionID, nnAddr, dn.getConf()); - - // First phase of the handshake with NN - get the namespace - // info. - bpNSInfo = retrieveNamespaceInfo(); - - // Now that we know the namespace ID, etc, we can pass this to the DN. - // The DN can now initialize its local storage if we are the - // first BP to handshake, etc. - dn.initBlockPool(this); - - // Second phase of the handshake with the NN. - register(); - } - - /** - * This methods arranges for the data node to send the block report at - * the next heartbeat. - */ - void scheduleBlockReport(long delay) { - if (delay > 0) { // send BR after random delay - lastBlockReport = System.currentTimeMillis() - - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); - } else { // send at next heartbeat - lastBlockReport = lastHeartbeat - dnConf.blockReportInterval; - } - resetBlockReportTime = true; // reset future BRs for randomness - } - void reportBadBlocks(ExtendedBlock block) { - DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) }; - LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; - - try { - bpNamenode.reportBadBlocks(blocks); - } catch (IOException e){ - /* One common reason is that NameNode could be in safe mode. - * Should we keep on retrying in that case? - */ - LOG.warn("Failed to report bad block " + block + " to namenode : " - + " Exception", e); + checkBlock(block); + for (BPServiceActor actor : bpServices) { + actor.reportBadBlocks(block); } - } - /** - * Report received blocks and delete hints to the Namenode - * - * @throws IOException - */ - private void reportReceivedDeletedBlocks() throws IOException { - - // check if there are newly received blocks - ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null; - int currentReceivedRequestsCounter; - synchronized (receivedAndDeletedBlockList) { - currentReceivedRequestsCounter = pendingReceivedRequests; - int numBlocks = receivedAndDeletedBlockList.size(); - if (numBlocks > 0) { - // - // Send newly-received and deleted blockids to namenode - // - receivedAndDeletedBlockArray = receivedAndDeletedBlockList - .toArray(new ReceivedDeletedBlockInfo[numBlocks]); - } - } - if (receivedAndDeletedBlockArray != null) { - bpNamenode.blockReceivedAndDeleted(bpRegistration, getBlockPoolId(), - receivedAndDeletedBlockArray); - synchronized (receivedAndDeletedBlockList) { - for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) { - receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]); - } - pendingReceivedRequests -= currentReceivedRequestsCounter; - } - } - } - /* * Informing the name node could take a long long time! Should we wait * till namenode is informed before responding with success to the * client? For now we don't. */ void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) { - if (block == null || delHint == null) { - throw new IllegalArgumentException(block == null ? "Block is null" - : "delHint is null"); + checkBlock(block); + checkDelHint(delHint); + ReceivedDeletedBlockInfo bInfo = + new ReceivedDeletedBlockInfo(block.getLocalBlock(), delHint); + for (BPServiceActor actor : bpServices) { + actor.notifyNamenodeReceivedBlock(bInfo); } + } - if (!block.getBlockPoolId().equals(getBlockPoolId())) { - LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. " - + getBlockPoolId()); - return; - } - - synchronized (receivedAndDeletedBlockList) { - receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block - .getLocalBlock(), delHint)); - pendingReceivedRequests++; - receivedAndDeletedBlockList.notifyAll(); - } + private void checkBlock(ExtendedBlock block) { + Preconditions.checkArgument(block != null, + "block is null"); + Preconditions.checkArgument(block.getBlockPoolId().equals(getBlockPoolId()), + "block belongs to BP %s instead of BP %s", + block.getBlockPoolId(), getBlockPoolId()); + } + + private void checkDelHint(String delHint) { + Preconditions.checkArgument(delHint != null, + "delHint is null"); } void notifyNamenodeDeletedBlock(ExtendedBlock block) { - if (block == null) { - throw new IllegalArgumentException("Block is null"); - } - - if (!block.getBlockPoolId().equals(getBlockPoolId())) { - LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. " - + getBlockPoolId()); - return; - } - - synchronized (receivedAndDeletedBlockList) { - receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block - .getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT)); + checkBlock(block); + ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block + .getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT); + + for (BPServiceActor actor : bpServices) { + actor.notifyNamenodeDeletedBlock(bInfo); } } - - /** - * Report the list blocks to the Namenode - * @throws IOException - */ - DatanodeCommand blockReport() throws IOException { - // send block report if timer has expired. - DatanodeCommand cmd = null; - long startTime = now(); - if (startTime - lastBlockReport > dnConf.blockReportInterval) { - - // Create block report - long brCreateStartTime = now(); - BlockListAsLongs bReport = dn.data.getBlockReport(getBlockPoolId()); - - // Send block report - long brSendStartTime = now(); - cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport - .getBlockListAsLongs()); - - // Log the block report processing stats from Datanode perspective - long brSendCost = now() - brSendStartTime; - long brCreateCost = brSendStartTime - brCreateStartTime; - dn.metrics.addBlockReport(brSendCost); - LOG.info("BlockReport of " + bReport.getNumberOfBlocks() - + " blocks took " + brCreateCost + " msec to generate and " - + brSendCost + " msecs for RPC and NN processing"); - - // If we have sent the first block report, then wait a random - // time before we start the periodic block reports. - if (resetBlockReportTime) { - lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval)); - resetBlockReportTime = false; - } else { - /* say the last block report was at 8:20:14. The current report - * should have started around 9:20:14 (default 1 hour interval). - * If current time is : - * 1) normal like 9:20:18, next report should be at 10:20:14 - * 2) unexpected like 11:35:43, next report should be at 12:20:14 - */ - lastBlockReport += (now() - lastBlockReport) / - dnConf.blockReportInterval * dnConf.blockReportInterval; - } - LOG.info("sent block report, processed command:" + cmd); - } - return cmd; - } - - - DatanodeCommand [] sendHeartBeat() throws IOException { - return bpNamenode.sendHeartbeat(bpRegistration, - dn.data.getCapacity(), - dn.data.getDfsUsed(), - dn.data.getRemaining(), - dn.data.getBlockPoolUsed(getBlockPoolId()), - dn.xmitsInProgress.get(), - dn.getXceiverCount(), dn.data.getNumFailedVolumes()); - } - //This must be called only by blockPoolManager void start() { - if ((bpThread != null) && (bpThread.isAlive())) { - //Thread is started already - return; + for (BPServiceActor actor : bpServices) { + actor.start(); } - bpThread = new Thread(this, formatThreadName()); - bpThread.setDaemon(true); // needed for JUnit testing - bpThread.start(); - } - - private String formatThreadName() { - Collection dataDirs = DataNode.getStorageDirs(dn.getConf()); - return "DataNode: [" + - StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "] " + - " heartbeating to " + nnAddr; } //This must be called only by blockPoolManager. void stop() { - shouldServiceRun = false; - if (bpThread != null) { - bpThread.interrupt(); + for (BPServiceActor actor : bpServices) { + actor.stop(); } } //This must be called only by blockPoolManager void join() { - try { - if (bpThread != null) { - bpThread.join(); - } - } catch (InterruptedException ie) { } + for (BPServiceActor actor : bpServices) { + actor.join(); + } + } + + synchronized UpgradeManagerDatanode getUpgradeManager() { + if(upgradeManager == null) + upgradeManager = + new UpgradeManagerDatanode(dn, getBlockPoolId()); + + return upgradeManager; } - //Cleanup method to be called by current thread before exiting. - private synchronized void cleanUp() { - - if(upgradeManager != null) - upgradeManager.shutdownUpgrade(); - shouldServiceRun = false; - RPC.stopProxy(bpNamenode); - dn.shutdownBlockPool(this); + void processDistributedUpgradeCommand(UpgradeCommand comm) + throws IOException { + UpgradeManagerDatanode upgradeManager = getUpgradeManager(); + upgradeManager.processUpgradeCommand(comm); } /** - * Main loop for each BP thread. Run until shutdown, - * forever calling remote NameNode functions. + * Start distributed upgrade if it should be initiated by the data-node. */ - private void offerService() throws Exception { - LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of " - + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of " - + dnConf.blockReportInterval + "msec" + " Initial delay: " - + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval=" - + dnConf.heartBeatInterval); - - // - // Now loop for a long time.... - // - while (shouldRun()) { - try { - long startTime = now(); - - // - // Every so often, send heartbeat or block-report - // - if (startTime - lastHeartbeat > dnConf.heartBeatInterval) { - // - // All heartbeat messages include following info: - // -- Datanode name - // -- data transfer port - // -- Total capacity - // -- Bytes remaining - // - lastHeartbeat = startTime; - if (!dn.areHeartbeatsDisabledForTests()) { - DatanodeCommand[] cmds = sendHeartBeat(); - dn.metrics.addHeartbeat(now() - startTime); - - long startProcessCommands = now(); - if (!processCommand(cmds)) - continue; - long endProcessCommands = now(); - if (endProcessCommands - startProcessCommands > 2000) { - LOG.info("Took " + (endProcessCommands - startProcessCommands) + - "ms to process " + cmds.length + " commands from NN"); - } - } - } - if (pendingReceivedRequests > 0 - || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) { - reportReceivedDeletedBlocks(); - lastDeletedReport = startTime; - } - - DatanodeCommand cmd = blockReport(); - processCommand(cmd); - - // Now safe to start scanning the block pool - if (dn.blockScanner != null) { - dn.blockScanner.addBlockPool(this.getBlockPoolId()); - } - - // - // There is no work to do; sleep until hearbeat timer elapses, - // or work arrives, and then iterate again. - // - long waitTime = dnConf.heartBeatInterval - - (System.currentTimeMillis() - lastHeartbeat); - synchronized(receivedAndDeletedBlockList) { - if (waitTime > 0 && pendingReceivedRequests == 0) { - try { - receivedAndDeletedBlockList.wait(waitTime); - } catch (InterruptedException ie) { - LOG.warn("BPOfferService for " + this + " interrupted"); - } - } - } // synchronized - } catch(RemoteException re) { - String reClass = re.getClassName(); - if (UnregisteredNodeException.class.getName().equals(reClass) || - DisallowedDatanodeException.class.getName().equals(reClass) || - IncorrectVersionException.class.getName().equals(reClass)) { - LOG.warn(this + " is shutting down", re); - shouldServiceRun = false; - return; - } - LOG.warn("RemoteException in offerService", re); - try { - long sleepTime = Math.min(1000, dnConf.heartBeatInterval); - Thread.sleep(sleepTime); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - } catch (IOException e) { - LOG.warn("IOException in offerService", e); - } - } // while (shouldRun()) - } // offerService - - /** - * Register one bp with the corresponding NameNode - *

- * The bpDatanode needs to register with the namenode on startup in order - * 1) to report which storage it is serving now and - * 2) to receive a registrationID - * - * issued by the namenode to recognize registered datanodes. - * - * @see FSNamesystem#registerDatanode(DatanodeRegistration) - * @throws IOException - */ - void register() throws IOException { - Preconditions.checkState(bpNSInfo != null, - "register() should be called after handshake()"); + synchronized void startDistributedUpgradeIfNeeded() throws IOException { + UpgradeManagerDatanode um = getUpgradeManager(); - // The handshake() phase loaded the block pool storage - // off disk - so update the bpRegistration object from that info - bpRegistration = dn.createBPRegistration(bpNSInfo); - - LOG.info(this + " beginning handshake with NN"); - - while (shouldRun()) { - try { - // Use returned registration from namenode with updated machine name. - bpRegistration = bpNamenode.registerDatanode(bpRegistration); - break; - } catch(SocketTimeoutException e) { // namenode is busy - LOG.info("Problem connecting to server: " + nnAddr); - sleepAndLogInterrupts(1000, "connecting to server"); - } - } - - LOG.info("Block pool " + this + " successfully registered with NN"); - dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); - - // random short delay - helps scatter the BR from all DNs - scheduleBlockReport(dnConf.initialBlockReportDelay); + if(!um.getUpgradeState()) + return; + um.setUpgradeState(false, um.getUpgradeVersion()); + um.startUpgrade(); + return; } - - - private void sleepAndLogInterrupts(int millis, - String stateString) { - try { - Thread.sleep(millis); - } catch (InterruptedException ie) { - LOG.info("BPOfferService " + this + - " interrupted while " + stateString); - } + + DataNode getDataNode() { + return dn; } /** - * No matter what kind of exception we get, keep retrying to offerService(). - * That's the loop that connects to the NameNode and provides basic DataNode - * functionality. - * - * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can - * happen either at shutdown or due to refreshNamenodes. + * Called by the BPServiceActors when they handshake to a NN. + * If this is the first NN connection, this sets the namespace info + * for this BPOfferService. If it's a connection to a new NN, it + * verifies that this namespace matches (eg to prevent a misconfiguration + * where a StandbyNode from a different cluster is specified) */ - @Override - public void run() { - LOG.info(this + " starting to offer service"); - - try { - // init stuff - try { - // setup storage - connectToNNAndHandshake(); - } catch (IOException ioe) { - // Initial handshake, storage recovery or registration failed - // End BPOfferService thread - LOG.fatal("Initialization failed for block pool " + this, ioe); - return; - } - - initialized = true; // bp is initialized; + void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { + if (this.bpNSInfo == null) { + this.bpNSInfo = nsInfo; - while (shouldRun()) { - try { - startDistributedUpgradeIfNeeded(); - offerService(); - } catch (Exception ex) { - LOG.error("Exception in BPOfferService for " + this, ex); - sleepAndLogInterrupts(5000, "offering service"); - } - } - } catch (Throwable ex) { - LOG.warn("Unexpected exception in block pool " + this, ex); - } finally { - LOG.warn("Ending block pool service for: " + this); - cleanUp(); + // Now that we know the namespace ID, etc, we can pass this to the DN. + // The DN can now initialize its local storage if we are the + // first BP to handshake, etc. + dn.initBlockPool(this); + return; + } else { + checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), + "Blockpool ID"); + checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), + "Namespace ID"); + checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(), + "Cluster ID"); } } - private boolean shouldRun() { - return shouldServiceRun && dn.shouldRun(); - } - /** - * Process an array of datanode commands - * - * @param cmds an array of datanode commands - * @return true if further processing may be required or false otherwise. + * After one of the BPServiceActors registers successfully with the + * NN, it calls this function to verify that the NN it connected to + * is consistent with other NNs serving the block-pool. */ - private boolean processCommand(DatanodeCommand[] cmds) { - if (cmds != null) { - for (DatanodeCommand cmd : cmds) { - try { - if (processCommand(cmd) == false) { - return false; - } - } catch (IOException ioe) { - LOG.warn("Error processing datanode Command", ioe); - } + void registrationSucceeded(BPServiceActor bpServiceActor, + DatanodeRegistration reg) throws IOException { + if (bpRegistration != null) { + checkNSEquality(bpRegistration.storageInfo.getNamespaceID(), + reg.storageInfo.getNamespaceID(), "namespace ID"); + checkNSEquality(bpRegistration.storageInfo.getClusterID(), + reg.storageInfo.getClusterID(), "cluster ID"); + } else { + bpRegistration = reg; + } + } + + /** + * Verify equality of two namespace-related fields, throwing + * an exception if they are unequal. + */ + private static void checkNSEquality( + Object ourID, Object theirID, + String idHelpText) throws IOException { + if (!ourID.equals(theirID)) { + throw new IOException(idHelpText + " mismatch: " + + "previously connected to " + idHelpText + " " + ourID + + " but now connected to " + idHelpText + " " + theirID); + } + } + + DatanodeRegistration createRegistration() { + Preconditions.checkState(bpNSInfo != null, + "getRegistration() can only be called after initial handshake"); + return dn.createBPRegistration(bpNSInfo); + } + + /** + * Called when an actor shuts down. If this is the last actor + * to shut down, shuts down the whole blockpool in the DN. + */ + void shutdownActor(BPServiceActor actor) { + if (bpServiceToActive == actor) { + bpServiceToActive = null; + } + + bpServices.remove(actor); + + // TODO: synchronization should be a little better here + if (bpServices.isEmpty()) { + dn.shutdownBlockPool(this); + + if(upgradeManager != null) + upgradeManager.shutdownUpgrade(); + } + } + + @Deprecated + InetSocketAddress getNNSocketAddress() { + // TODO(HA) this doesn't make sense anymore + return bpServiceToActive.getNNSocketAddress(); + } + + /** + * Called by the DN to report an error to the NNs. + */ + void trySendErrorReport(int errCode, String errMsg) { + for (BPServiceActor actor : bpServices) { + actor.trySendErrorReport(errCode, errMsg); + } + } + + /** + * Ask each of the actors to schedule a block report after + * the specified delay. + */ + void scheduleBlockReport(long delay) { + for (BPServiceActor actor : bpServices) { + actor.scheduleBlockReport(delay); + } + } + + /** + * Ask each of the actors to report a bad block hosted on another DN. + */ + void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block) { + for (BPServiceActor actor : bpServices) { + try { + actor.reportRemoteBadBlock(dnInfo, block); + } catch (IOException e) { + LOG.warn("Couldn't report bad block " + block + " to " + actor, + e); } } - return true; + } + + /** + * TODO: this is still used in a few places where we need to sort out + * what to do in HA! + * @return a proxy to the active NN + */ + @Deprecated + DatanodeProtocol getActiveNN() { + return bpServiceToActive.bpNamenode; + } + + /** + * @return true if the given NN address is one of the NNs for this + * block pool + */ + boolean containsNN(InetSocketAddress addr) { + for (BPServiceActor actor : bpServices) { + if (actor.getNNSocketAddress().equals(addr)) { + return true; + } + } + return false; + } + + @VisibleForTesting + int countNameNodes() { + return bpServices.size(); + } + + /** + * Run an immediate block report on this thread. Used by tests. + */ + @VisibleForTesting + void triggerBlockReportForTests() throws IOException { + for (BPServiceActor actor : bpServices) { + actor.triggerBlockReportForTests(); + } + } + + boolean processCommandFromActor(DatanodeCommand cmd, + BPServiceActor actor) throws IOException { + assert bpServices.contains(actor); + if (actor == bpServiceToActive) { + return processCommandFromActive(cmd, actor); + } else { + return processCommandFromStandby(cmd, actor); + } } /** @@ -669,7 +412,8 @@ private boolean processCommand(DatanodeCommand[] cmds) { * @return true if further processing may be required or false otherwise. * @throws IOException */ - private boolean processCommand(DatanodeCommand cmd) throws IOException { + private boolean processCommandFromActive(DatanodeCommand cmd, + BPServiceActor actor) throws IOException { if (cmd == null) return true; final BlockCommand bcmd = @@ -700,19 +444,12 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException { dn.metrics.incrBlocksRemoved(toDelete.length); break; case DatanodeProtocol.DNA_SHUTDOWN: - // shut down the data node - shouldServiceRun = false; - return false; + // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command + throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN"); case DatanodeProtocol.DNA_REGISTER: // namenode requested a registration - at start or if NN lost contact LOG.info("DatanodeCommand action: DNA_REGISTER"); - if (shouldRun()) { - // re-retrieve namespace info to make sure that, if the NN - // was restarted, we still match its version (HDFS-2120) - retrieveNamespaceInfo(); - // and re-register - register(); - } + actor.reRegister(); break; case DatanodeProtocol.DNA_FINALIZE: String bp = ((FinalizeCommand) cmd).getBlockPoolId(); @@ -732,7 +469,8 @@ assert getBlockPoolId().equals(bp) : case DatanodeProtocol.DNA_ACCESSKEYUPDATE: LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE"); if (dn.isBlockTokenEnabled) { - dn.blockPoolTokenSecretManager.setKeys(getBlockPoolId(), + dn.blockPoolTokenSecretManager.setKeys( + getBlockPoolId(), ((KeyUpdateCommand) cmd).getExportedKeys()); } break; @@ -751,32 +489,39 @@ assert getBlockPoolId().equals(bp) : } return true; } - - private void processDistributedUpgradeCommand(UpgradeCommand comm) - throws IOException { - UpgradeManagerDatanode upgradeManager = getUpgradeManager(); - upgradeManager.processUpgradeCommand(comm); + + private boolean processCommandFromStandby(DatanodeCommand cmd, + BPServiceActor actor) throws IOException { + if (cmd == null) + return true; + switch(cmd.getAction()) { + case DatanodeProtocol.DNA_REGISTER: + // namenode requested a registration - at start or if NN lost contact + LOG.info("DatanodeCommand action: DNA_REGISTER"); + actor.reRegister(); + return true; + case DatanodeProtocol.DNA_TRANSFER: + case DatanodeProtocol.DNA_INVALIDATE: + case DatanodeProtocol.DNA_SHUTDOWN: + case DatanodeProtocol.DNA_RECOVERBLOCK: + case DatanodeProtocol.DNA_ACCESSKEYUPDATE: + case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE: + LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction()); + return true; + default: + LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); + } + return true; } - synchronized UpgradeManagerDatanode getUpgradeManager() { - if(upgradeManager == null) - upgradeManager = - new UpgradeManagerDatanode(dn, getBlockPoolId()); - - return upgradeManager; - } - /** - * Start distributed upgrade if it should be initiated by the data-node. + * Connect to the NN at the given address. This is separated out for ease + * of testing. */ - private void startDistributedUpgradeIfNeeded() throws IOException { - UpgradeManagerDatanode um = getUpgradeManager(); - - if(!um.getUpgradeState()) - return; - um.setUpgradeState(false, um.getUpgradeVersion()); - um.startUpgrade(); - return; + DatanodeProtocol connectToNN(InetSocketAddress nnAddr) + throws IOException { + return (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class, + DatanodeProtocol.versionID, nnAddr, dn.getConf()); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java new file mode 100644 index 0000000000..2c4a15bf81 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -0,0 +1,633 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.hdfs.server.common.Util.now; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.util.Collection; +import java.util.LinkedList; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; +import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A thread per active or standby namenode to perform: + *

    + *
  • Pre-registration handshake with namenode
  • + *
  • Registration with namenode
  • + *
  • Send periodic heartbeats to the namenode
  • + *
  • Handle commands received from the namenode
  • + *
+ */ +@InterfaceAudience.Private +class BPServiceActor implements Runnable { + + static final Log LOG = DataNode.LOG; + final InetSocketAddress nnAddr; + + BPOfferService bpos; + + long lastBlockReport = 0; + long lastDeletedReport = 0; + + boolean resetBlockReportTime = true; + + Thread bpThread; + DatanodeProtocol bpNamenode; + private long lastHeartbeat = 0; + private volatile boolean initialized = false; + private final LinkedList receivedAndDeletedBlockList + = new LinkedList(); + private volatile int pendingReceivedRequests = 0; + private volatile boolean shouldServiceRun = true; + private final DataNode dn; + private final DNConf dnConf; + + private DatanodeRegistration bpRegistration; + + BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) { + this.bpos = bpos; + this.dn = bpos.getDataNode(); + this.nnAddr = nnAddr; + this.dnConf = dn.getDnConf(); + } + + /** + * returns true if BP thread has completed initialization of storage + * and has registered with the corresponding namenode + * @return true if initialized + */ + boolean isInitialized() { + return initialized; + } + + boolean isAlive() { + return shouldServiceRun && bpThread.isAlive(); + } + + @Override + public String toString() { + return bpos.toString() + " service to " + nnAddr; + } + + InetSocketAddress getNNSocketAddress() { + return nnAddr; + } + + /** + * Used to inject a spy NN in the unit tests. + */ + @VisibleForTesting + void setNameNode(DatanodeProtocol dnProtocol) { + bpNamenode = dnProtocol; + } + + /** + * Perform the first part of the handshake with the NameNode. + * This calls versionRequest to determine the NN's + * namespace and version info. It automatically retries until + * the NN responds or the DN is shutting down. + * + * @return the NamespaceInfo + */ + @VisibleForTesting + NamespaceInfo retrieveNamespaceInfo() throws IOException { + NamespaceInfo nsInfo = null; + while (shouldRun()) { + try { + nsInfo = bpNamenode.versionRequest(); + LOG.debug(this + " received versionRequest response: " + nsInfo); + break; + } catch(SocketTimeoutException e) { // namenode is busy + LOG.warn("Problem connecting to server: " + nnAddr); + } catch(IOException e ) { // namenode is not available + LOG.warn("Problem connecting to server: " + nnAddr); + } + + // try again in a second + sleepAndLogInterrupts(5000, "requesting version info from NN"); + } + + if (nsInfo != null) { + checkNNVersion(nsInfo); + } else { + throw new IOException("DN shut down before block pool connected"); + } + return nsInfo; + } + + private void checkNNVersion(NamespaceInfo nsInfo) + throws IncorrectVersionException { + // build and layout versions should match + String nsBuildVer = nsInfo.getBuildVersion(); + String stBuildVer = Storage.getBuildVersion(); + if (!nsBuildVer.equals(stBuildVer)) { + LOG.warn("Data-node and name-node Build versions must be the same. " + + "Namenode build version: " + nsBuildVer + "Datanode " + + "build version: " + stBuildVer); + throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer); + } + + if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) { + LOG.warn("Data-node and name-node layout versions must be the same." + + " Expected: "+ HdfsConstants.LAYOUT_VERSION + + " actual "+ nsInfo.getLayoutVersion()); + throw new IncorrectVersionException( + nsInfo.getLayoutVersion(), "namenode"); + } + } + + private void connectToNNAndHandshake() throws IOException { + // get NN proxy + bpNamenode = bpos.connectToNN(nnAddr); + + // First phase of the handshake with NN - get the namespace + // info. + NamespaceInfo nsInfo = retrieveNamespaceInfo(); + + // Verify that this matches the other NN in this HA pair. + // This also initializes our block pool in the DN if we are + // the first NN connection for this BP. + bpos.verifyAndSetNamespaceInfo(nsInfo); + + // Second phase of the handshake with the NN. + register(); + } + + /** + * This methods arranges for the data node to send the block report at + * the next heartbeat. + */ + void scheduleBlockReport(long delay) { + if (delay > 0) { // send BR after random delay + lastBlockReport = System.currentTimeMillis() + - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); + } else { // send at next heartbeat + lastBlockReport = lastHeartbeat - dnConf.blockReportInterval; + } + resetBlockReportTime = true; // reset future BRs for randomness + } + + void reportBadBlocks(ExtendedBlock block) { + DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) }; + LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; + + try { + bpNamenode.reportBadBlocks(blocks); + } catch (IOException e){ + /* One common reason is that NameNode could be in safe mode. + * Should we keep on retrying in that case? + */ + LOG.warn("Failed to report bad block " + block + " to namenode : " + + " Exception", e); + } + } + + /** + * Report received blocks and delete hints to the Namenode + * + * @throws IOException + */ + private void reportReceivedDeletedBlocks() throws IOException { + + // check if there are newly received blocks + ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null; + int currentReceivedRequestsCounter; + synchronized (receivedAndDeletedBlockList) { + currentReceivedRequestsCounter = pendingReceivedRequests; + int numBlocks = receivedAndDeletedBlockList.size(); + if (numBlocks > 0) { + // + // Send newly-received and deleted blockids to namenode + // + receivedAndDeletedBlockArray = receivedAndDeletedBlockList + .toArray(new ReceivedDeletedBlockInfo[numBlocks]); + } + } + if (receivedAndDeletedBlockArray != null) { + bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(), + receivedAndDeletedBlockArray); + synchronized (receivedAndDeletedBlockList) { + for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) { + receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]); + } + pendingReceivedRequests -= currentReceivedRequestsCounter; + } + } + } + + /* + * Informing the name node could take a long long time! Should we wait + * till namenode is informed before responding with success to the + * client? For now we don't. + */ + void notifyNamenodeReceivedBlock(ReceivedDeletedBlockInfo bInfo) { + synchronized (receivedAndDeletedBlockList) { + receivedAndDeletedBlockList.add(bInfo); + pendingReceivedRequests++; + receivedAndDeletedBlockList.notifyAll(); + } + } + + void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) { + synchronized (receivedAndDeletedBlockList) { + receivedAndDeletedBlockList.add(bInfo); + } + } + + /** + * Run an immediate block report on this thread. Used by tests. + */ + @VisibleForTesting + void triggerBlockReportForTests() throws IOException { + lastBlockReport = 0; + blockReport(); + } + + /** + * Report the list blocks to the Namenode + * @throws IOException + */ + DatanodeCommand blockReport() throws IOException { + // send block report if timer has expired. + DatanodeCommand cmd = null; + long startTime = now(); + if (startTime - lastBlockReport > dnConf.blockReportInterval) { + + // Create block report + long brCreateStartTime = now(); + BlockListAsLongs bReport = dn.getFSDataset().getBlockReport( + bpos.getBlockPoolId()); + + // Send block report + long brSendStartTime = now(); + cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), bReport + .getBlockListAsLongs()); + + // Log the block report processing stats from Datanode perspective + long brSendCost = now() - brSendStartTime; + long brCreateCost = brSendStartTime - brCreateStartTime; + dn.getMetrics().addBlockReport(brSendCost); + LOG.info("BlockReport of " + bReport.getNumberOfBlocks() + + " blocks took " + brCreateCost + " msec to generate and " + + brSendCost + " msecs for RPC and NN processing"); + + // If we have sent the first block report, then wait a random + // time before we start the periodic block reports. + if (resetBlockReportTime) { + lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval)); + resetBlockReportTime = false; + } else { + /* say the last block report was at 8:20:14. The current report + * should have started around 9:20:14 (default 1 hour interval). + * If current time is : + * 1) normal like 9:20:18, next report should be at 10:20:14 + * 2) unexpected like 11:35:43, next report should be at 12:20:14 + */ + lastBlockReport += (now() - lastBlockReport) / + dnConf.blockReportInterval * dnConf.blockReportInterval; + } + LOG.info("sent block report, processed command:" + cmd); + } + return cmd; + } + + + DatanodeCommand [] sendHeartBeat() throws IOException { + LOG.info("heartbeat: " + this); + // TODO: saw an NPE here - maybe if the two BPOS register at + // same time, this one won't block on the other one? + return bpNamenode.sendHeartbeat(bpRegistration, + dn.getFSDataset().getCapacity(), + dn.getFSDataset().getDfsUsed(), + dn.getFSDataset().getRemaining(), + dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId()), + dn.getXmitsInProgress(), + dn.getXceiverCount(), dn.getFSDataset().getNumFailedVolumes()); + } + + //This must be called only by BPOfferService + void start() { + if ((bpThread != null) && (bpThread.isAlive())) { + //Thread is started already + return; + } + bpThread = new Thread(this, formatThreadName()); + bpThread.setDaemon(true); // needed for JUnit testing + bpThread.start(); + } + + private String formatThreadName() { + Collection dataDirs = DataNode.getStorageDirs(dn.getConf()); + return "DataNode: [" + + StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "] " + + " heartbeating to " + nnAddr; + } + + //This must be called only by blockPoolManager. + void stop() { + shouldServiceRun = false; + if (bpThread != null) { + bpThread.interrupt(); + } + } + + //This must be called only by blockPoolManager + void join() { + try { + if (bpThread != null) { + bpThread.join(); + } + } catch (InterruptedException ie) { } + } + + //Cleanup method to be called by current thread before exiting. + private synchronized void cleanUp() { + + shouldServiceRun = false; + RPC.stopProxy(bpNamenode); + bpos.shutdownActor(this); + } + + /** + * Main loop for each BP thread. Run until shutdown, + * forever calling remote NameNode functions. + */ + private void offerService() throws Exception { + LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of " + + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of " + + dnConf.blockReportInterval + "msec" + " Initial delay: " + + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval=" + + dnConf.heartBeatInterval); + + // + // Now loop for a long time.... + // + while (shouldRun()) { + try { + long startTime = now(); + + // + // Every so often, send heartbeat or block-report + // + if (startTime - lastHeartbeat > dnConf.heartBeatInterval) { + // + // All heartbeat messages include following info: + // -- Datanode name + // -- data transfer port + // -- Total capacity + // -- Bytes remaining + // + lastHeartbeat = startTime; + if (!dn.areHeartbeatsDisabledForTests()) { + DatanodeCommand[] cmds = sendHeartBeat(); + dn.getMetrics().addHeartbeat(now() - startTime); + + long startProcessCommands = now(); + if (!processCommand(cmds)) + continue; + long endProcessCommands = now(); + if (endProcessCommands - startProcessCommands > 2000) { + LOG.info("Took " + (endProcessCommands - startProcessCommands) + + "ms to process " + cmds.length + " commands from NN"); + } + } + } + if (pendingReceivedRequests > 0 + || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) { + reportReceivedDeletedBlocks(); + lastDeletedReport = startTime; + } + + DatanodeCommand cmd = blockReport(); + processCommand(new DatanodeCommand[]{ cmd }); + + // Now safe to start scanning the block pool + // TODO(HA): this doesn't seem quite right + if (dn.blockScanner != null) { + dn.blockScanner.addBlockPool(bpos.getBlockPoolId()); + } + + // + // There is no work to do; sleep until hearbeat timer elapses, + // or work arrives, and then iterate again. + // + long waitTime = dnConf.heartBeatInterval - + (System.currentTimeMillis() - lastHeartbeat); + synchronized(receivedAndDeletedBlockList) { + if (waitTime > 0 && pendingReceivedRequests == 0) { + try { + receivedAndDeletedBlockList.wait(waitTime); + } catch (InterruptedException ie) { + LOG.warn("BPOfferService for " + this + " interrupted"); + } + } + } // synchronized + } catch(RemoteException re) { + String reClass = re.getClassName(); + if (UnregisteredNodeException.class.getName().equals(reClass) || + DisallowedDatanodeException.class.getName().equals(reClass) || + IncorrectVersionException.class.getName().equals(reClass)) { + LOG.warn(this + " is shutting down", re); + shouldServiceRun = false; + return; + } + LOG.warn("RemoteException in offerService", re); + try { + long sleepTime = Math.min(1000, dnConf.heartBeatInterval); + Thread.sleep(sleepTime); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } catch (IOException e) { + LOG.warn("IOException in offerService", e); + } + } // while (shouldRun()) + } // offerService + + /** + * Register one bp with the corresponding NameNode + *

+ * The bpDatanode needs to register with the namenode on startup in order + * 1) to report which storage it is serving now and + * 2) to receive a registrationID + * + * issued by the namenode to recognize registered datanodes. + * + * @see FSNamesystem#registerDatanode(DatanodeRegistration) + * @throws IOException + */ + void register() throws IOException { + // The handshake() phase loaded the block pool storage + // off disk - so update the bpRegistration object from that info + bpRegistration = bpos.createRegistration(); + + LOG.info(this + " beginning handshake with NN"); + + while (shouldRun()) { + try { + // Use returned registration from namenode with updated machine name. + bpRegistration = bpNamenode.registerDatanode(bpRegistration); + break; + } catch(SocketTimeoutException e) { // namenode is busy + LOG.info("Problem connecting to server: " + nnAddr); + sleepAndLogInterrupts(1000, "connecting to server"); + } + } + + LOG.info("Block pool " + this + " successfully registered with NN"); + bpos.registrationSucceeded(this, bpRegistration); + + // random short delay - helps scatter the BR from all DNs + scheduleBlockReport(dnConf.initialBlockReportDelay); + } + + + private void sleepAndLogInterrupts(int millis, + String stateString) { + try { + Thread.sleep(millis); + } catch (InterruptedException ie) { + LOG.info("BPOfferService " + this + + " interrupted while " + stateString); + } + } + + /** + * No matter what kind of exception we get, keep retrying to offerService(). + * That's the loop that connects to the NameNode and provides basic DataNode + * functionality. + * + * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can + * happen either at shutdown or due to refreshNamenodes. + */ + @Override + public void run() { + LOG.info(this + " starting to offer service"); + + try { + // init stuff + try { + // setup storage + connectToNNAndHandshake(); + } catch (IOException ioe) { + // Initial handshake, storage recovery or registration failed + // End BPOfferService thread + LOG.fatal("Initialization failed for block pool " + this, ioe); + return; + } + + initialized = true; // bp is initialized; + + while (shouldRun()) { + try { + bpos.startDistributedUpgradeIfNeeded(); + offerService(); + } catch (Exception ex) { + LOG.error("Exception in BPOfferService for " + this, ex); + sleepAndLogInterrupts(5000, "offering service"); + } + } + } catch (Throwable ex) { + LOG.warn("Unexpected exception in block pool " + this, ex); + } finally { + LOG.warn("Ending block pool service for: " + this); + cleanUp(); + } + } + + private boolean shouldRun() { + return shouldServiceRun && dn.shouldRun(); + } + + /** + * Process an array of datanode commands + * + * @param cmds an array of datanode commands + * @return true if further processing may be required or false otherwise. + */ + boolean processCommand(DatanodeCommand[] cmds) { + if (cmds != null) { + for (DatanodeCommand cmd : cmds) { + try { + if (bpos.processCommandFromActor(cmd, this) == false) { + return false; + } + } catch (IOException ioe) { + LOG.warn("Error processing datanode Command", ioe); + } + } + } + return true; + } + + void trySendErrorReport(int errCode, String errMsg) { + try { + bpNamenode.errorReport(bpRegistration, errCode, errMsg); + } catch(IOException e) { + LOG.warn("Error reporting an error to NameNode " + nnAddr, + e); + } + } + + /** + * Report a bad block from another DN in this cluster. + */ + void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block) + throws IOException { + LocatedBlock lb = new LocatedBlock(block, + new DatanodeInfo[] {dnInfo}); + bpNamenode.reportBadBlocks(new LocatedBlock[] {lb}); + } + + void reRegister() throws IOException { + if (shouldRun()) { + // re-retrieve namespace info to make sure that, if the NN + // was restarted, we still match its version (HDFS-2120) + retrieveNamespaceInfo(); + // and re-register + register(); + } + } + +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 61bc29acf4..c8aac296a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -320,7 +320,6 @@ private void handleMirrorOutError(IOException ioe) throws IOException { private void verifyChunks( byte[] dataBuf, int dataOff, int len, byte[] checksumBuf, int checksumOff ) throws IOException { - DatanodeProtocol nn = datanode.getBPNamenode(block.getBlockPoolId()); while (len > 0) { int chunkLen = Math.min(len, bytesPerChecksum); @@ -331,9 +330,7 @@ private void verifyChunks( byte[] dataBuf, int dataOff, int len, try { LOG.info("report corrupt block " + block + " from datanode " + srcDataNode + " to namenode"); - LocatedBlock lb = new LocatedBlock(block, - new DatanodeInfo[] {srcDataNode}); - nn.reportBadBlocks(new LocatedBlock[] {lb}); + datanode.reportRemoteBadBlock(srcDataNode, block); } catch (IOException e) { LOG.warn("Failed to report bad block " + block + " from datanode " + srcDataNode + " to namenode"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index aba55f8c6a..dc3a18163b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -71,6 +71,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -243,7 +244,7 @@ public static InetSocketAddress createSocketAddr(String target) { @InterfaceAudience.Private class BlockPoolManager { private final Map bpMapping; - private final Map nameNodeThreads; + private final List offerServices; //This lock is used only to ensure exclusion of refreshNamenodes private final Object refreshNamenodesLock = new Object(); @@ -251,31 +252,26 @@ class BlockPoolManager { BlockPoolManager(Configuration conf) throws IOException { bpMapping = new HashMap(); - nameNodeThreads = new HashMap(); + offerServices = new ArrayList(); Map> map = DFSUtil.getNNServiceRpcAddresses(conf); for (Entry> entry : map.entrySet()) { List nnList = Lists.newArrayList(entry.getValue().values()); - // TODO(HA) when HDFS-1971 (dual BRs) is done, pass all of the NNs - // to BPOS - InetSocketAddress isa = nnList.get(0); - BPOfferService bpos = new BPOfferService(isa, DataNode.this); - nameNodeThreads.put(bpos.getNNSocketAddress(), bpos); + BPOfferService bpos = new BPOfferService(nnList, DataNode.this); + offerServices.add(bpos); } } - synchronized void addBlockPool(BPOfferService t) { - if (nameNodeThreads.get(t.getNNSocketAddress()) == null) { - throw new IllegalArgumentException( - "Unknown BPOfferService thread for namenode address:" - + t.getNNSocketAddress()); - } - if (t.getBlockPoolId() == null) { + synchronized void addBlockPool(BPOfferService bpos) { + Preconditions.checkArgument(offerServices.contains(bpos), + "Unknown BPOS: %s", bpos); + if (bpos.getBlockPoolId() == null) { throw new IllegalArgumentException("Null blockpool id"); } - bpMapping.put(t.getBlockPoolId(), t); + LOG.info("===> registering in bpmapping: " + bpos); + bpMapping.put(bpos.getBlockPoolId(), bpos); } /** @@ -283,21 +279,26 @@ synchronized void addBlockPool(BPOfferService t) { * Caution: The BPOfferService returned could be shutdown any time. */ synchronized BPOfferService[] getAllNamenodeThreads() { - BPOfferService[] bposArray = new BPOfferService[nameNodeThreads.values() - .size()]; - return nameNodeThreads.values().toArray(bposArray); + BPOfferService[] bposArray = new BPOfferService[offerServices.size()]; + return offerServices.toArray(bposArray); } - - synchronized BPOfferService get(InetSocketAddress addr) { - return nameNodeThreads.get(addr); - } - + synchronized BPOfferService get(String bpid) { return bpMapping.get(bpid); } + // TODO(HA) would be good to kill this + synchronized BPOfferService get(InetSocketAddress addr) { + for (BPOfferService bpos : offerServices) { + if (bpos.containsNN(addr)) { + return bpos; + } + } + return null; + } + synchronized void remove(BPOfferService t) { - nameNodeThreads.remove(t.getNNSocketAddress()); + offerServices.remove(t); bpMapping.remove(t.getBlockPoolId()); } @@ -318,7 +319,7 @@ synchronized void startAll() throws IOException { UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction() { public Object run() throws Exception { - for (BPOfferService bpos : nameNodeThreads.values()) { + for (BPOfferService bpos : offerServices) { bpos.start(); } return null; @@ -339,6 +340,10 @@ void joinAll() { void refreshNamenodes(Configuration conf) throws IOException { + throw new UnsupportedOperationException("TODO(HA)"); +/* + * TODO(HA) + LOG.info("Refresh request received for nameservices: " + conf.get(DFS_FEDERATION_NAMESERVICES)); @@ -355,20 +360,20 @@ void refreshNamenodes(Configuration conf) List toStart = new ArrayList(); synchronized (refreshNamenodesLock) { synchronized (this) { - for (InetSocketAddress nnaddr : nameNodeThreads.keySet()) { + for (InetSocketAddress nnaddr : offerServices.keySet()) { if (!(newAddresses.contains(nnaddr))) { - toShutdown.add(nameNodeThreads.get(nnaddr)); + toShutdown.add(offerServices.get(nnaddr)); } } for (InetSocketAddress nnaddr : newAddresses) { - if (!(nameNodeThreads.containsKey(nnaddr))) { + if (!(offerServices.containsKey(nnaddr))) { toStart.add(nnaddr); } } for (InetSocketAddress nnaddr : toStart) { BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this); - nameNodeThreads.put(bpos.getNNSocketAddress(), bpos); + offerServices.put(bpos.getNNSocketAddress(), bpos); } } @@ -383,7 +388,9 @@ void refreshNamenodes(Configuration conf) // Now start the threads that are not already running. startAll(); } + */ } + } volatile boolean shouldRun = true; @@ -685,13 +692,44 @@ protected void notifyNamenodeDeletedBlock(ExtendedBlock block) { } } + /** + * Report a bad block which is hosted on the local DN. + */ public void reportBadBlocks(ExtendedBlock block) throws IOException{ - BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); - if(bpos == null || bpos.bpNamenode == null) { - throw new IOException("cannot locate OfferService thread for bp="+block.getBlockPoolId()); - } + BPOfferService bpos = getBPOSForBlock(block); bpos.reportBadBlocks(block); } + + /** + * Report a bad block on another DN (eg if we received a corrupt replica + * from a remote host). + * @param srcDataNode the DN hosting the bad block + * @param block the block itself + */ + public void reportRemoteBadBlock(DatanodeInfo srcDataNode, ExtendedBlock block) + throws IOException { + BPOfferService bpos = getBPOSForBlock(block); + bpos.reportRemoteBadBlock(srcDataNode, block); + } + + /** + * Return the BPOfferService instance corresponding to the given block. + * @param block + * @return the BPOS + * @throws IOException if no such BPOS can be found + */ + private BPOfferService getBPOSForBlock(ExtendedBlock block) + throws IOException { + Preconditions.checkNotNull(block); + BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); + if (bpos == null) { + throw new IOException("cannot locate OfferService thread for bp="+ + block.getBlockPoolId()); + } + return bpos; + } + + // used only for testing void setHeartbeatsDisabledForTests( @@ -1006,11 +1044,15 @@ DatanodeRegistration getDNRegistrationForBP(String bpid) /** * get BP registration by machine and port name (host:port) - * @param mName + * @param mName - the name that the NN used * @return BP registration * @throws IOException */ DatanodeRegistration getDNRegistrationByMachineName(String mName) { + // TODO: all the BPs should have the same name as each other, they all come + // from getName() here! and the use cases only are in tests where they just + // call with getName(). So we could probably just make this method return + // the first BPOS's registration BPOfferService [] bposArray = blockPoolManager.getAllNamenodeThreads(); for (BPOfferService bpos : bposArray) { if(bpos.bpRegistration.getName().equals(mName)) @@ -1055,6 +1097,8 @@ public InterDatanodeProtocol run() throws IOException { * @return namenode address corresponding to the bpid */ public InetSocketAddress getNameNodeAddr(String bpid) { + // TODO(HA) this function doesn't make sense! used by upgrade code + // Should it return just the active one or simply return the BPService. BPOfferService bp = blockPoolManager.get(bpid); if (bp != null) { return bp.getNNSocketAddress(); @@ -1288,12 +1332,7 @@ private void handleDiskError(String errMsgr) { //inform NameNodes for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) { - DatanodeProtocol nn = bpos.bpNamenode; - try { - nn.errorReport(bpos.bpRegistration, dpError, errMsgr); - } catch(IOException e) { - LOG.warn("Error reporting disk failure to NameNode", e); - } + bpos.trySendErrorReport(dpError, errMsgr); } if(hasEnoughResources) { @@ -1309,6 +1348,10 @@ private void handleDiskError(String errMsgr) { int getXceiverCount() { return threadGroup == null ? 0 : threadGroup.activeCount(); } + + int getXmitsInProgress() { + return xmitsInProgress.get(); + } UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) { BPOfferService bpos = blockPoolManager.get(bpid); @@ -1321,14 +1364,15 @@ UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) { private void transferBlock( ExtendedBlock block, DatanodeInfo xferTargets[] ) throws IOException { - DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId()); + BPOfferService bpos = getBPOSForBlock(block); DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId()); if (!data.isValidBlock(block)) { // block does not exist or is under-construction String errStr = "Can't send invalid block " + block; LOG.info(errStr); - nn.errorReport(bpReg, DatanodeProtocol.INVALID_BLOCK, errStr); + + bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errStr); return; } @@ -1336,9 +1380,7 @@ private void transferBlock( ExtendedBlock block, long onDiskLength = data.getLength(block); if (block.getNumBytes() > onDiskLength) { // Shorter on-disk len indicates corruption so report NN the corrupt block - nn.reportBadBlocks(new LocatedBlock[]{ - new LocatedBlock(block, new DatanodeInfo[] { - new DatanodeInfo(bpReg)})}); + bpos.reportBadBlocks(block); LOG.warn("Can't replicate block " + block + " because on-disk length " + onDiskLength + " is shorter than NameNode recorded length " + block.getNumBytes()); @@ -1991,10 +2033,10 @@ private void recoverBlock(RecoveringBlock rBlock) throws IOException { */ public DatanodeProtocol getBPNamenode(String bpid) throws IOException { BPOfferService bpos = blockPoolManager.get(bpid); - if(bpos == null || bpos.bpNamenode == null) { + if (bpos == null) { throw new IOException("cannot find a namnode proxy for bpid=" + bpid); } - return bpos.bpNamenode; + return bpos.getActiveNN(); } /** Block synchronization */ @@ -2013,6 +2055,7 @@ void syncBlock(RecoveringBlock rBlock, // or their replicas have 0 length. // The block can be deleted. if (syncList.isEmpty()) { + // TODO: how does this work in HA?? nn.commitBlockSynchronization(block, recoveryId, 0, true, true, DatanodeID.EMPTY_ARRAY); return; @@ -2229,7 +2272,7 @@ public int getInfoPort(){ public String getNamenodeAddresses() { final Map info = new HashMap(); for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { - if (bpos != null && bpos.bpThread != null) { + if (bpos != null) { info.put(bpos.getNNSocketAddress().getHostName(), bpos.getBlockPoolId()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java new file mode 100644 index 0000000000..33b0e64aed --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; +import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class TestBPOfferService { + + private static final String FAKE_BPID = "fake bpid"; + private static final String FAKE_CLUSTERID = "fake cluster"; + protected static final Log LOG = LogFactory.getLog( + TestBPOfferService.class); + private static final ExtendedBlock FAKE_BLOCK = + new ExtendedBlock(FAKE_BPID, 12345L); + + static { + ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); + } + + private DatanodeProtocol mockNN1; + private DatanodeProtocol mockNN2; + private DataNode mockDn; + private FSDatasetInterface mockFSDataset; + + @Before + public void setupMocks() throws Exception { + mockNN1 = setupNNMock(); + mockNN2 = setupNNMock(); + + // Set up a mock DN with the bare-bones configuration + // objects, etc. + mockDn = Mockito.mock(DataNode.class); + Mockito.doReturn(true).when(mockDn).shouldRun(); + Configuration conf = new Configuration(); + Mockito.doReturn(conf).when(mockDn).getConf(); + Mockito.doReturn(new DNConf(conf)).when(mockDn).getDnConf(); + Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")) + .when(mockDn).getMetrics(); + + // Set up a simulated dataset with our fake BP + mockFSDataset = Mockito.spy(new SimulatedFSDataset(conf)); + mockFSDataset.addBlockPool(FAKE_BPID, conf); + + // Wire the dataset to the DN. + Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(); + } + + /** + * Set up a mock NN with the bare minimum for a DN to register to it. + */ + private DatanodeProtocol setupNNMock() throws Exception { + DatanodeProtocol mock = Mockito.mock(DatanodeProtocol.class); + Mockito.doReturn( + new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, + 0, HdfsConstants.LAYOUT_VERSION)) + .when(mock).versionRequest(); + return mock; + } + + /** + * Test that the BPOS can register to talk to two different NNs, + * sends block reports to both, etc. + */ + @Test + public void testBasicFunctionality() throws Exception { + BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); + bpos.start(); + try { + waitForInitialization(bpos); + + // The DN should have register to both NNs. + Mockito.verify(mockNN1).registerDatanode( + (DatanodeRegistration) Mockito.anyObject()); + Mockito.verify(mockNN2).registerDatanode( + (DatanodeRegistration) Mockito.anyObject()); + + // Should get block reports from both NNs + waitForBlockReport(mockNN1); + waitForBlockReport(mockNN2); + + // When we receive a block, it should report it to both NNs + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, ""); + + ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1); + assertEquals(1, ret.length); + assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock()); + + ret = waitForBlockReceived(FAKE_BLOCK, mockNN2); + assertEquals(1, ret.length); + assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock()); + + } finally { + bpos.stop(); + } + } + + /** + * Test that DNA_INVALIDATE commands from the standby are ignored. + */ + @Test + public void testIgnoreDeletionsFromNonActive() throws Exception { + BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); + + // Ask to invalidate FAKE_BLOCK when block report hits the + // standby + Mockito.doReturn(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, + FAKE_BPID, new Block[] { FAKE_BLOCK.getLocalBlock() })) + .when(mockNN2).blockReport( + Mockito.anyObject(), + Mockito.eq(FAKE_BPID), + Mockito.anyObject()); + + bpos.start(); + try { + waitForInitialization(bpos); + + // Should get block reports from both NNs + waitForBlockReport(mockNN1); + waitForBlockReport(mockNN2); + + } finally { + bpos.stop(); + } + + // Should ignore the delete command from the standby + Mockito.verify(mockFSDataset, Mockito.never()) + .invalidate(Mockito.eq(FAKE_BPID), + (Block[]) Mockito.anyObject()); + } + + /** + * Ensure that, if the two NNs configured for a block pool + * have different block pool IDs, they will refuse to both + * register. + */ + @Test + public void testNNsFromDifferentClusters() throws Exception { + Mockito.doReturn( + new NamespaceInfo(1, "fake foreign cluster", FAKE_BPID, + 0, HdfsConstants.LAYOUT_VERSION)) + .when(mockNN1).versionRequest(); + + BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); + bpos.start(); + try { + waitForOneToFail(bpos); + } finally { + bpos.stop(); + } + } + + private void waitForOneToFail(final BPOfferService bpos) + throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return bpos.countNameNodes() == 1; + } + }, 100, 10000); + } + + /** + * Create a BPOfferService which registers with and heartbeats with the + * specified namenode proxy objects. + */ + private BPOfferService setupBPOSForNNs(DatanodeProtocol ... nns) { + // Set up some fake InetAddresses, then override the connectToNN + // function to return the corresponding proxies. + + final Map nnMap = Maps.newLinkedHashMap(); + for (int port = 0; port < nns.length; port++) { + nnMap.put(new InetSocketAddress(port), nns[port]); + } + + return new BPOfferService(Lists.newArrayList(nnMap.keySet()), mockDn) { + @Override + DatanodeProtocol connectToNN(InetSocketAddress nnAddr) throws IOException { + DatanodeProtocol nn = nnMap.get(nnAddr); + if (nn == null) { + throw new AssertionError("bad NN addr: " + nnAddr); + } + return nn; + } + }; + } + + private void waitForInitialization(final BPOfferService bpos) + throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return bpos.isAlive() && bpos.isInitialized(); + } + }, 100, 10000); + } + + private void waitForBlockReport(final DatanodeProtocol mockNN) + throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + Mockito.verify(mockNN).blockReport( + Mockito.anyObject(), + Mockito.eq(FAKE_BPID), + Mockito.anyObject()); + return true; + } catch (Throwable t) { + LOG.info("waiting on block report: " + t.getMessage()); + return false; + } + } + }, 500, 10000); + } + + private ReceivedDeletedBlockInfo[] waitForBlockReceived( + ExtendedBlock fakeBlock, + DatanodeProtocol mockNN) throws Exception { + final ArgumentCaptor captor = + ArgumentCaptor.forClass(ReceivedDeletedBlockInfo[].class); + GenericTestUtils.waitFor(new Supplier() { + + @Override + public Boolean get() { + try { + Mockito.verify(mockNN1).blockReceivedAndDeleted( + Mockito.anyObject(), + Mockito.eq(FAKE_BPID), + captor.capture()); + return true; + } catch (Throwable t) { + return false; + } + } + }, 100, 10000); + return captor.getValue(); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java index cc82682ec4..760eb08ba8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java @@ -93,23 +93,22 @@ public void test2NNRegistration() throws IOException { assertEquals("number of volumes is wrong", 2, volInfos.size()); for (BPOfferService bpos : dn.getAllBpOs()) { - LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name + "; sid=" - + bpos.bpRegistration.storageID + "; nna=" + bpos.nnAddr); + LOG.info("BP: " + bpos); } BPOfferService bpos1 = dn.getAllBpOs()[0]; BPOfferService bpos2 = dn.getAllBpOs()[1]; // The order of bpos is not guaranteed, so fix the order - if (bpos1.nnAddr.equals(nn2.getNameNodeAddress())) { + if (bpos1.getNNSocketAddress().equals(nn2.getNameNodeAddress())) { BPOfferService tmp = bpos1; bpos1 = bpos2; bpos2 = tmp; } - assertEquals("wrong nn address", bpos1.nnAddr, + assertEquals("wrong nn address", bpos1.getNNSocketAddress(), nn1.getNameNodeAddress()); - assertEquals("wrong nn address", bpos2.nnAddr, + assertEquals("wrong nn address", bpos2.getNNSocketAddress(), nn2.getNameNodeAddress()); assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1); assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2); @@ -156,15 +155,14 @@ public void testFedSingleNN() throws IOException { for (BPOfferService bpos : dn.getAllBpOs()) { LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name + "; sid=" - + bpos.bpRegistration.storageID + "; nna=" + bpos.nnAddr); + + bpos.bpRegistration.storageID + "; nna=" + bpos.getNNSocketAddress()); } // try block report BPOfferService bpos1 = dn.getAllBpOs()[0]; - bpos1.lastBlockReport = 0; - bpos1.blockReport(); + bpos1.triggerBlockReportForTests(); - assertEquals("wrong nn address", bpos1.nnAddr, + assertEquals("wrong nn address", bpos1.getNNSocketAddress(), nn1.getNameNodeAddress()); assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1); assertEquals("wrong cid", dn.getClusterId(), cid1); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java index 97554e7a80..ba36b27764 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java @@ -42,17 +42,19 @@ public void testDataNodeRegister() throws Exception { DataNode mockDN = mock(DataNode.class); Mockito.doReturn(true).when(mockDN).shouldRun(); - BPOfferService bpos = new BPOfferService(INVALID_ADDR, mockDN); + BPOfferService mockBPOS = Mockito.mock(BPOfferService.class); + Mockito.doReturn(mockDN).when(mockBPOS).getDataNode(); + + BPServiceActor actor = new BPServiceActor(INVALID_ADDR, mockBPOS); NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class); when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion"); DatanodeProtocol fakeDNProt = mock(DatanodeProtocol.class); when(fakeDNProt.versionRequest()).thenReturn(fakeNSInfo); - bpos.setNameNode( fakeDNProt ); - bpos.bpNSInfo = fakeNSInfo; + actor.setNameNode( fakeDNProt ); try { - bpos.retrieveNamespaceInfo(); + actor.retrieveNamespaceInfo(); fail("register() did not throw exception! " + "Expected: IncorrectVersionException"); } catch (IncorrectVersionException ie) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java index 1360cad5ca..cfa1d64c90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java @@ -72,7 +72,7 @@ public void testRefreshNamenodes() throws IOException { InetSocketAddress addr = cluster.getNameNode(i).getNameNodeAddress(); boolean found = false; for (int j = 0; j < bpoList.length; j++) { - if (bpoList[j] != null && addr.equals(bpoList[j].nnAddr)) { + if (bpoList[j] != null && addr.equals(bpoList[j].getNNSocketAddress())) { found = true; bpoList[j] = null; // Erase the address that matched break;