diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 069841b1c9..eee364ccde 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -580,6 +580,12 @@ public static ProtocolProxy getProtocolProxy(Class protocol, * @param proxy the RPC proxy object to be stopped */ public static void stopProxy(Object proxy) { + if (proxy instanceof ProtocolTranslator) { + RPC.stopProxy(((ProtocolTranslator)proxy) + .getUnderlyingProxyObject()); + return; + } + InvocationHandler invocationHandler = null; try { invocationHandler = Proxy.getInvocationHandler(proxy); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 18de5f6334..42bdcf8a86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -238,3 +238,5 @@ HDFS-3013. HA: NameNode format doesn't pick up dfs.namenode.name.dir.NameService HDFS-3019. Fix silent failure of TestEditLogJournalFailures (todd) HDFS-2958. Sweep for remaining proxy construction which doesn't go through failover path. (atm) + +HDFS-2920. fix remaining TODO items. (atm and todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index f0dc8ceff2..83cd9a8a3c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -418,22 +418,9 @@ boolean renewLease() throws IOException { /** * Close connections the Namenode. - * The namenode variable is either a rpcProxy passed by a test or - * created using the protocolTranslator which is closeable. - * If closeable then call close, else close using RPC.stopProxy(). */ void closeConnectionToNamenode() { - if (namenode instanceof Closeable) { - try { - ((Closeable) namenode).close(); - return; - } catch (IOException e) { - // fall through - lets try the stopProxy - LOG.warn("Exception closing namenode, stopping the proxy"); - } - } else { - RPC.stopProxy(namenode); - } + RPC.stopProxy(namenode); } /** Abort and release resources held. Ignore all errors. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 5ef00bfe9c..641dfc11bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -694,7 +694,6 @@ public boolean restoreFailedStorage(String arg) * * @throws IOException */ - //TODO(HA): Should this be @Idempotent? public void finalizeUpgrade() throws IOException; /** @@ -704,7 +703,6 @@ public boolean restoreFailedStorage(String arg) * @return upgrade status information or null if no upgrades are in progress * @throws IOException */ - //TODO(HA): Should this be @Idempotent? public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException; @@ -737,7 +735,7 @@ public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) * @param bandwidth Blanacer bandwidth in bytes per second for this datanode. * @throws IOException */ - //TODO(HA): Should this be @Idempotent? + @Idempotent public void setBalancerBandwidth(long bandwidth) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index d03f27060b..bc32ab00a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -45,6 +45,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; @@ -63,7 +64,8 @@ @InterfaceAudience.Private @InterfaceStability.Stable public class ClientDatanodeProtocolTranslatorPB implements - ProtocolMetaInterface, ClientDatanodeProtocol, Closeable { + ProtocolMetaInterface, ClientDatanodeProtocol, + ProtocolTranslator, Closeable { public static final Log LOG = LogFactory .getLog(ClientDatanodeProtocolTranslatorPB.class); @@ -211,4 +213,9 @@ public boolean isMethodSupported(String methodName) throws IOException { ClientDatanodeProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), methodName); } + + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 9a76f1e43d..cbae6f2246 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.io.PrintWriter; -import java.io.StringWriter; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index c35b35f064..aaba4fff2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -383,7 +383,6 @@ synchronized void shutdownActor(BPServiceActor actor) { bpServices.remove(actor); - // TODO: synchronization should be a little better here if (bpServices.isEmpty()) { dn.shutdownBlockPool(this); @@ -392,12 +391,6 @@ synchronized void shutdownActor(BPServiceActor actor) { } } - @Deprecated - synchronized InetSocketAddress getNNSocketAddress() { - // TODO(HA) this doesn't make sense anymore - return bpServiceToActive.getNNSocketAddress(); - } - /** * Called by the DN to report an error to the NNs. */ @@ -432,11 +425,9 @@ void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block) { } /** - * TODO: this is still used in a few places where we need to sort out - * what to do in HA! - * @return a proxy to the active NN + * @return a proxy to the active NN, or null if the BPOS has not + * acknowledged any NN as active yet. */ - @Deprecated synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() { if (bpServiceToActive != null) { return bpServiceToActive.bpNamenode; @@ -596,6 +587,7 @@ private boolean processCommandFromActive(DatanodeCommand cmd, break; case DatanodeProtocol.DNA_SHUTDOWN: // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command + // See HDFS-2987. throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN"); case DatanodeProtocol.DNA_REGISTER: // namenode requested a registration - at start or if NN lost contact diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 982a568503..75f32cbc04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -538,8 +538,8 @@ private void offerService() throws Exception { DatanodeCommand cmd = blockReport(); processCommand(new DatanodeCommand[]{ cmd }); - // Now safe to start scanning the block pool - // TODO(HA): this doesn't seem quite right + // Now safe to start scanning the block pool. + // If it has already been started, this is a no-op. if (dn.blockScanner != null) { dn.blockScanner.addBlockPool(bpos.getBlockPoolId()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java index 3176be2078..3355ee269a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java @@ -86,16 +86,6 @@ synchronized BPOfferService get(String bpid) { return bpByBlockPoolId.get(bpid); } - // TODO(HA) would be good to kill this - synchronized BPOfferService get(InetSocketAddress addr) { - for (BPOfferService bpos : offerServices) { - if (bpos.containsNN(addr)) { - return bpos; - } - } - return null; - } - synchronized void remove(BPOfferService t) { offerServices.remove(t); bpByBlockPoolId.remove(t.getBlockPoolId()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index acbcb032a8..f13466a4ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -565,6 +565,23 @@ public void reportRemoteBadBlock(DatanodeInfo srcDataNode, ExtendedBlock block) bpos.reportRemoteBadBlock(srcDataNode, block); } + /** + * Try to send an error report to the NNs associated with the given + * block pool. + * @param bpid the block pool ID + * @param errCode error code to send + * @param errMsg textual message to send + */ + void trySendErrorReport(String bpid, int errCode, String errMsg) { + BPOfferService bpos = blockPoolManager.get(bpid); + if (bpos == null) { + throw new IllegalArgumentException("Bad block pool: " + bpid); + } + bpos.trySendErrorReport(errCode, errMsg); + } + + + /** * Return the BPOfferService instance corresponding to the given block. * @param block @@ -874,7 +891,7 @@ DatanodeRegistration getDNRegistrationByMachineName(String mName) { // TODO: all the BPs should have the same name as each other, they all come // from getName() here! and the use cases only are in tests where they just // call with getName(). So we could probably just make this method return - // the first BPOS's registration + // the first BPOS's registration. See HDFS-2609. BPOfferService [] bposArray = blockPoolManager.getAllNamenodeThreads(); for (BPOfferService bpos : bposArray) { if(bpos.bpRegistration.getName().equals(mName)) @@ -920,22 +937,6 @@ public InterDatanodeProtocol run() throws IOException { throw new IOException(ie.getMessage()); } } - - /** - * get the name node address based on the block pool id - * @param bpid block pool ID - * @return namenode address corresponding to the bpid - */ - public InetSocketAddress getNameNodeAddr(String bpid) { - // TODO(HA) this function doesn't make sense! used by upgrade code - // Should it return just the active one or simply return the BPService. - BPOfferService bp = blockPoolManager.get(bpid); - if (bp != null) { - return bp.getNNSocketAddress(); - } - LOG.warn("No name node address found for block pool ID " + bpid); - return null; - } public InetSocketAddress getSelfAddr() { return selfAddr; @@ -1869,7 +1870,7 @@ private void recoverBlock(RecoveringBlock rBlock) throws IOException { * @return Namenode corresponding to the bpid * @throws IOException */ - public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid) + public DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(String bpid) throws IOException { BPOfferService bpos = blockPoolManager.get(bpid); if (bpos == null) { @@ -1888,9 +1889,13 @@ public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid) void syncBlock(RecoveringBlock rBlock, List syncList) throws IOException { ExtendedBlock block = rBlock.getBlock(); - DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block - .getBlockPoolId()); - assert nn != null; + DatanodeProtocolClientSideTranslatorPB nn = + getActiveNamenodeForBP(block.getBlockPoolId()); + if (nn == null) { + throw new IOException( + "Unable to synchronize block " + rBlock + ", since this DN " + + " has not acknowledged any NN as active."); + } long recoveryId = rBlock.getNewGenerationStamp(); if (LOG.isDebugEnabled()) { @@ -2111,14 +2116,19 @@ public int getInfoPort(){ /** * Returned information is a JSON representation of a map with - * name node host name as the key and block pool Id as the value + * name node host name as the key and block pool Id as the value. + * Note that, if there are multiple NNs in an NA nameservice, + * a given block pool may be represented twice. */ @Override // DataNodeMXBean public String getNamenodeAddresses() { final Map info = new HashMap(); for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { if (bpos != null) { - info.put(bpos.getNNSocketAddress().getHostName(), bpos.getBlockPoolId()); + for (BPServiceActor actor : bpos.getBPServiceActors()) { + info.put(actor.getNNSocketAddress().getHostName(), + bpos.getBlockPoolId()); + } } } return JSON.toString(info); @@ -2167,11 +2177,18 @@ public void deleteBlockPool(String blockPoolId, boolean force) /** * @param addr rpc address of the namenode - * @return true - if BPOfferService corresponding to the namenode is alive + * @return true if the datanode is connected to a NameNode at the + * given address */ - public boolean isBPServiceAlive(InetSocketAddress addr) { - BPOfferService bp = blockPoolManager.get(addr); - return bp != null ? bp.isAlive() : false; + public boolean isConnectedToNN(InetSocketAddress addr) { + for (BPOfferService bpos : getAllBpOs()) { + for (BPServiceActor bpsa : bpos.getBPServiceActors()) { + if (addr.equals(bpsa.getNNSocketAddress())) { + return bpsa.isAlive(); + } + } + } + return false; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java index 478fb5660d..9ada40fd5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java @@ -92,7 +92,7 @@ public synchronized boolean startUpgrade() throws IOException { "UpgradeManagerDatanode.currentUpgrades is not null."; assert upgradeDaemon == null : "UpgradeManagerDatanode.upgradeDaemon is not null."; - DatanodeProtocol nn = dataNode.getBPNamenode(bpid); + DatanodeProtocol nn = dataNode.getActiveNamenodeForBP(bpid); nn.processUpgradeCommand(broadcastCommand); return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java index ddb1d6029f..49d26212d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java @@ -45,7 +45,7 @@ protected DataNode getDatanode() { } protected DatanodeProtocol getNamenode() throws IOException { - return dataNode.getBPNamenode(bpid); + return dataNode.getActiveNamenodeForBP(bpid); } void setDatanode(DataNode dataNode, String bpid) { @@ -92,14 +92,7 @@ boolean preUpgradeAction(NamespaceInfo nsInfo) throws IOException { + " Name-node version = " + nsInfo.getLayoutVersion() + "."; DataNode.LOG.fatal( errorMsg ); String bpid = nsInfo.getBlockPoolID(); - DatanodeProtocol nn = dataNode.getBPNamenode(bpid); - try { - nn.errorReport(dataNode.getDNRegistrationForBP(bpid), - DatanodeProtocol.NOTIFY, errorMsg); - } catch(SocketTimeoutException e) { // namenode is busy - DataNode.LOG.info("Problem connecting to server: " - + dataNode.getNameNodeAddr(nsInfo.getBlockPoolID())); - } + dataNode.trySendErrorReport(bpid, DatanodeProtocol.NOTIFY, errorMsg); throw new IOException(errorMsg); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 9c1eb25807..f922c11bc9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -282,18 +282,13 @@ INode unprotectedAddFile( String path, newNode = new INodeFile(permissions, 0, replication, modificationTime, atime, preferredBlockSize); } - writeLock(); // TODO: this is silly, considering the assert above! - try { - try { - newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE); - } catch (IOException e) { - return null; - } - return newNode; - } finally { - writeUnlock(); - } + try { + newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE); + } catch (IOException e) { + return null; + } + return newNode; } INodeDirectory addToParent(byte[] src, INodeDirectory parentINode, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index bf1ec992c4..84d4ace283 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -266,8 +266,8 @@ private void applyEditLogOp(FSEditLogOp op, FSDirectory fsDir, // Now close the file INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile; - // TODO: we could use removeLease(holder, path) here, but OP_CLOSE - // doesn't seem to serialize the holder... unclear why! + // One might expect that you could use removeLease(holder, path) here, + // but OP_CLOSE doesn't serialize the holder. So, remove by path. fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path); INodeFile newFile = ucFile.convertToInodeFile(); fsDir.replaceNode(addCloseOp.path, ucFile, newFile); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index adc3b46b7f..7fb3d4bdfc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -226,7 +226,6 @@ boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target) } } - // TODO(HA): Have to figure out a story for the first 3 of these. // 3. Do transitions switch(startOpt) { case UPGRADE: @@ -261,7 +260,6 @@ private boolean recoverStorageDirs(StartupOption startOpt, StorageState curState; try { curState = sd.analyzeStorage(startOpt, storage); - // TODO(HA): Fix this. String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf); if (curState != StorageState.NORMAL && HAUtil.isHAEnabled(conf, nameserviceId)) { throw new IOException("Cannot start an HA namenode with name dirs " + @@ -637,8 +635,6 @@ boolean loadFSImage(FSNamesystem target) throws IOException { // update the txid for the edit log editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1); - // TODO(HA): This should probably always return false when HA is enabled and - // we're coming up in standby state. return needToSave; } @@ -697,8 +693,6 @@ public long loadEdits(Iterable editStreams, } finally { FSEditLog.closeAllStreams(editStreams); // update the counts - // TODO(HA): this may be very slow -- we probably want to - // update them as we go for HA. target.dir.updateCountForINodeWithQuota(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 4d54701bbb..bc40864a4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -533,7 +533,6 @@ void startActiveServices() throws IOException { if (!editLog.isOpenForWrite()) { // During startup, we're already open for write during initialization. - // TODO(HA): consider adding a startup state? editLog.initJournalsForWrite(); // May need to recover editLog.recoverUnclosedStreams(); @@ -912,7 +911,6 @@ void close() { } finally { // using finally to ensure we also wait for lease daemon try { - // TODO: these lines spew lots of warnings about "already stopped" logs, etc stopActiveServices(); stopStandbyServices(); if (dir != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 5dc6256023..d07ed860d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -920,7 +920,7 @@ synchronized void monitorHealth() if (!haEnabled) { return; // no-op, if HA is not enabled } - // TODO:HA implement health check + // TODO(HA): implement health check return; } @@ -963,7 +963,7 @@ synchronized boolean readyToBecomeActive() /** * Class used as expose {@link NameNode} as context to {@link HAState} * - * TODO:HA + * TODO(HA): * When entering and exiting state, on failing to start services, * appropriate action is needed todo either shutdown the node or recover * from failure. @@ -1005,7 +1005,6 @@ public void prepareToStopStandbyServices() throws ServiceFailedException { @Override public void stopStandbyServices() throws IOException { - // TODO(HA): Are we guaranteed to be the only active here? if (namesystem != null) { namesystem.stopStandbyServices(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java index edfc53fb12..036dd431ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java @@ -176,7 +176,7 @@ private void doCheckpoint() throws InterruptedException, IOException { public void cancelAndPreventCheckpoints() throws ServiceFailedException { try { thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS); - // TODO: there is a really narrow race here if we are just + // TODO(HA): there is a really narrow race here if we are just // about to start a checkpoint - this won't cancel it! namesystem.getFSImage().cancelSaveNamespace( "About to exit standby state"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 770d0f1066..658282a0e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1696,9 +1696,9 @@ private synchronized boolean shouldWait(DatanodeInfo[] dnInfo, // If a datanode failed to start, then do not wait for (DataNodeProperties dn : dataNodes) { // the datanode thread communicating with the namenode should be alive - if (!dn.datanode.isBPServiceAlive(addr)) { - LOG.warn("BPOfferService failed to start in datanode " + dn.datanode - + " for namenode at " + addr); + if (!dn.datanode.isConnectedToNN(addr)) { + LOG.warn("BPOfferService in datanode " + dn.datanode + + " failed to connect to namenode at " + addr); return false; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 2a75998d79..59a61cf2ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -461,7 +461,7 @@ public void testZeroLenReplicas() throws IOException, InterruptedException { initReplicaRecovery(any(RecoveringBlock.class)); Daemon d = spyDN.recoverBlocks(initRecoveringBlocks()); d.join(); - DatanodeProtocol dnP = dn.getBPNamenode(POOL_ID); + DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID); verify(dnP).commitBlockSynchronization( block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY); } @@ -518,7 +518,7 @@ public void testNoReplicaUnderRecovery() throws IOException { } catch (IOException e) { e.getMessage().startsWith("Cannot recover "); } - DatanodeProtocol namenode = dn.getBPNamenode(POOL_ID); + DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID); verify(namenode, never()).commitBlockSynchronization( any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), anyBoolean(), any(DatanodeID[].class)); @@ -547,7 +547,7 @@ public void testNotMatchedReplicaID() throws IOException { } catch (IOException e) { e.getMessage().startsWith("Cannot recover "); } - DatanodeProtocol namenode = dn.getBPNamenode(POOL_ID); + DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID); verify(namenode, never()).commitBlockSynchronization( any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), anyBoolean(), any(DatanodeID[].class)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java index a3e8ceb90f..20a16c3166 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java @@ -23,6 +23,8 @@ import static org.junit.Assert.assertNotSame; import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; @@ -99,15 +101,15 @@ public void test2NNRegistration() throws IOException { BPOfferService bpos2 = dn.getAllBpOs()[1]; // The order of bpos is not guaranteed, so fix the order - if (bpos1.getNNSocketAddress().equals(nn2.getNameNodeAddress())) { + if (getNNSocketAddress(bpos1).equals(nn2.getNameNodeAddress())) { BPOfferService tmp = bpos1; bpos1 = bpos2; bpos2 = tmp; } - assertEquals("wrong nn address", bpos1.getNNSocketAddress(), + assertEquals("wrong nn address", getNNSocketAddress(bpos1), nn1.getNameNodeAddress()); - assertEquals("wrong nn address", bpos2.getNNSocketAddress(), + assertEquals("wrong nn address", getNNSocketAddress(bpos2), nn2.getNameNodeAddress()); assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1); assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2); @@ -121,6 +123,12 @@ public void test2NNRegistration() throws IOException { cluster.shutdown(); } } + + private static InetSocketAddress getNNSocketAddress(BPOfferService bpos) { + List actors = bpos.getBPServiceActors(); + assertEquals(1, actors.size()); + return actors.get(0).getNNSocketAddress(); + } /** * starts single nn and single dn and verifies registration and handshake @@ -154,14 +162,16 @@ public void testFedSingleNN() throws IOException { for (BPOfferService bpos : dn.getAllBpOs()) { LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name + "; sid=" - + bpos.bpRegistration.storageID + "; nna=" + bpos.getNNSocketAddress()); + + bpos.bpRegistration.storageID + "; nna=" + + getNNSocketAddress(bpos)); } // try block report BPOfferService bpos1 = dn.getAllBpOs()[0]; bpos1.triggerBlockReportForTests(); - assertEquals("wrong nn address", bpos1.getNNSocketAddress(), + assertEquals("wrong nn address", + getNNSocketAddress(bpos1), nn1.getNameNodeAddress()); assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1); assertEquals("wrong cid", dn.getClusterId(), cid1); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java index cfa1d64c90..2d6f210379 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java @@ -22,15 +22,18 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf; import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf; import org.junit.Test; +import com.google.common.base.Joiner; +import com.google.common.collect.Sets; + /** * Tests datanode refresh namenode list functionality. */ @@ -65,21 +68,24 @@ public void testRefreshNamenodes() throws IOException { cluster.addNameNode(conf, nnPort4); - BPOfferService[] bpoList = dn.getAllBpOs(); // Ensure a BPOfferService in the datanodes corresponds to // a namenode in the cluster + Set nnAddrsFromCluster = Sets.newHashSet(); for (int i = 0; i < 4; i++) { - InetSocketAddress addr = cluster.getNameNode(i).getNameNodeAddress(); - boolean found = false; - for (int j = 0; j < bpoList.length; j++) { - if (bpoList[j] != null && addr.equals(bpoList[j].getNNSocketAddress())) { - found = true; - bpoList[j] = null; // Erase the address that matched - break; - } - } - assertTrue("NameNode address " + addr + " is not found.", found); + assertTrue(nnAddrsFromCluster.add( + cluster.getNameNode(i).getNameNodeAddress())); } + + Set nnAddrsFromDN = Sets.newHashSet(); + for (BPOfferService bpos : dn.getAllBpOs()) { + for (BPServiceActor bpsa : bpos.getBPServiceActors()) { + assertTrue(nnAddrsFromDN.add(bpsa.getNNSocketAddress())); + } + } + + assertEquals("", + Joiner.on(",").join( + Sets.symmetricDifference(nnAddrsFromCluster, nnAddrsFromDN))); } finally { if (cluster != null) { cluster.shutdown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java index 465987c6cb..547ba72e49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java @@ -179,7 +179,7 @@ public void testWriteOverFailoverWithDnFail() throws Exception { // write another block and a half AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF); - stm.hflush(); // TODO: see above + stm.hflush(); LOG.info("Failing back to NN 0"); cluster.transitionToStandby(0); @@ -188,7 +188,7 @@ public void testWriteOverFailoverWithDnFail() throws Exception { cluster.stopDataNode(1); AppendTestUtil.write(stm, BLOCK_AND_A_HALF*2, BLOCK_AND_A_HALF); - stm.hflush(); // TODO: see above + stm.hflush(); stm.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java index a34d6bdfc2..5440c38cc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java @@ -127,9 +127,6 @@ public void testBothNodesInStandbyState() throws Exception { List dirs = Lists.newArrayList(); dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0)); dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1)); - // TODO: this failed once because it caught a ckpt file -- maybe - // this is possible if one of the NNs is really fast and the other is slow? - // need to loop this to suss out the race. FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.of()); }