diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index dcb198d6bf..e079202641 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -119,3 +119,5 @@ HDFS-2592. Balancer support for HA namenodes. (Uma Maheswara Rao G via todd) HDFS-2367. Enable the configuration of multiple HA cluster addresses. (atm) HDFS-2812. When becoming active, the NN should treat all leases as freshly renewed. (todd) + +HDFS-2737. Automatically trigger log rolls periodically on the active NN. (todd and atm) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b3fee6fc51..55d1ccd1ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -333,4 +333,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_HA_NAMENODE_ID_KEY = "dfs.ha.namenode.id"; public static final String DFS_HA_STANDBY_CHECKPOINTS_KEY = "dfs.ha.standby.checkpoints"; public static final boolean DFS_HA_STANDBY_CHECKPOINTS_DEFAULT = true; + public static final String DFS_HA_LOGROLL_PERIOD_KEY = "dfs.ha.log-roll.period"; + public static final int DFS_HA_LOGROLL_PERIOD_DEFAULT = 2 * 60; // 2m + public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period"; + public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index ad2f8f67f6..a260c0e4fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -22,6 +22,8 @@ import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -32,9 +34,9 @@ import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; -import org.apache.hadoop.util.ReflectionUtils; - +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; public class HAUtil { private HAUtil() { /* Hidden constructor */ } @@ -109,6 +111,39 @@ public static String getNameNodeIdFromAddress(final Configuration conf, return null; } + /** + * Given the configuration for this node, return a Configuration object for + * the other node in an HA setup. + * + * @param myConf the configuration of this node + * @return the configuration of the other node in an HA setup + */ + public static Configuration getConfForOtherNode( + Configuration myConf) { + + String nsId = DFSUtil.getNamenodeNameServiceId(myConf); + Collection nnIds = DFSUtil.getNameNodeIds(myConf, nsId); + String myNNId = myConf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY); + Preconditions.checkArgument(nnIds != null, + "Could not determine namenode ids in namespace '%s'", + nsId); + Preconditions.checkArgument(nnIds.size() == 2, + "Expected exactly 2 NameNodes in this namespace. Instead, got: '%s'", + Joiner.on("','").join(nnIds)); + Preconditions.checkState(myNNId != null && !myNNId.isEmpty(), + "Could not determine own NN ID"); + + ArrayList nnSet = Lists.newArrayList(nnIds); + nnSet.remove(myNNId); + assert nnSet.size() == 1; + String activeNN = nnSet.get(0); + + // Look up the address of the active NN. + Configuration confForOtherNode = new Configuration(myConf); + NameNode.initializeGenericKeys(confForOtherNode, nsId, activeNN); + return confForOtherNode; + } + /** * This is used only by tests at the moment. * @return true if the NN should allow read operations while in standby mode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index 51a74746b2..22a6d8a8eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -157,7 +157,6 @@ public long getTransactionID() throws IOException { } @Override - @SuppressWarnings("deprecation") public CheckpointSignature rollEditLog() throws IOException { try { return PBHelper.convert(rpcProxy.rollEditLog(NULL_CONTROLLER, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/NamenodeProtocolTranslatorR23.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/NamenodeProtocolTranslatorR23.java index 11589756af..e7ea9ec7bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/NamenodeProtocolTranslatorR23.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/NamenodeProtocolTranslatorR23.java @@ -134,7 +134,6 @@ public long getTransactionID() throws IOException { } @Override - @SuppressWarnings("deprecation") public CheckpointSignature rollEditLog() throws IOException { return rpcProxy.rollEditLog().convert(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/NamenodeWireProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/NamenodeWireProtocol.java index 6eaa224d43..2bfba10387 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/NamenodeWireProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/NamenodeWireProtocol.java @@ -84,10 +84,7 @@ public BlocksWithLocationsWritable getBlocks(DatanodeInfoWritable datanode, * call fails if the file system is in SafeMode. * @throws IOException * @return a unique token to identify this transaction. - * @deprecated - * See {@link org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode} */ - @Deprecated public CheckpointSignatureWritable rollEditLog() throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 54d6ebe3fc..e92ae02001 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -690,6 +690,10 @@ public static Collection getSharedEditsDirs(Configuration conf) { DFS_NAMENODE_SHARED_EDITS_DIR_KEY); return Util.stringCollectionAsURIs(dirNames); } + + public Configuration getConf() { + return conf; + } @Override public void readLock() { @@ -3846,6 +3850,7 @@ String getSafeModeTip() { CheckpointSignature rollEditLog() throws IOException { writeLock(); try { + checkOperation(OperationCategory.JOURNAL); if (isInSafeMode()) { throw new SafeModeException("Log not rolled", safeMode); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index a1bc504fe0..a0d7e14897 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -738,7 +738,6 @@ public long getTransactionID() { @Override // NamenodeProtocol public CheckpointSignature rollEditLog() throws IOException { - // TODO:HA decide on OperationCategory for this return namesystem.rollEditLog(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 264e3a72e6..160c16ed5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -19,21 +19,34 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Collection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.EditLogInputException; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.ipc.RPC; + +import static org.apache.hadoop.hdfs.server.common.Util.now; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; + /** * EditLogTailer represents a thread which periodically reads from edits * journals and applies the transactions contained within to a given @@ -50,13 +63,87 @@ public class EditLogTailer { private FSEditLog editLog; private volatile Runtime runtime = Runtime.getRuntime(); + + private InetSocketAddress activeAddr; + private NamenodeProtocol cachedActiveProxy = null; + + /** + * The last transaction ID at which an edit log roll was initiated. + */ + private long lastRollTriggerTxId = HdfsConstants.INVALID_TXID; + + /** + * The highest transaction ID loaded by the Standby. + */ + private long lastLoadedTxnId = HdfsConstants.INVALID_TXID; + + /** + * The last time we successfully loaded a non-zero number of edits from the + * shared directory. + */ + private long lastLoadTimestamp; + + /** + * How often the Standby should roll edit logs. Since the Standby only reads + * from finalized log segments, the Standby will only be as up-to-date as how + * often the logs are rolled. + */ + private long logRollPeriodMs; + + /** + * How often the Standby should check if there are new finalized segment(s) + * available to be read from. + */ + private long sleepTimeMs; public EditLogTailer(FSNamesystem namesystem) { this.tailerThread = new EditLogTailerThread(); this.namesystem = namesystem; this.editLog = namesystem.getEditLog(); + + + Configuration conf = namesystem.getConf(); + lastLoadTimestamp = now(); + + logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, + DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT) * 1000; + if (logRollPeriodMs >= 0) { + this.activeAddr = getActiveNodeAddress(); + Preconditions.checkArgument(activeAddr.getPort() > 0, + "Active NameNode must have an IPC port configured. " + + "Got address '%s'", activeAddr); + LOG.info("Will roll logs on active node at " + activeAddr + " every " + + (logRollPeriodMs / 1000) + " seconds."); + } else { + LOG.info("Not going to trigger log rolls on active node because " + + DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY + " is negative."); + } + + sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, + DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000; + + LOG.debug("logRollPeriodMs=" + logRollPeriodMs + + " sleepTime=" + sleepTimeMs); } + private InetSocketAddress getActiveNodeAddress() { + Configuration conf = namesystem.getConf(); + Configuration activeConf = HAUtil.getConfForOtherNode(conf); + return NameNode.getServiceAddress(activeConf, true); + } + + private NamenodeProtocol getActiveNodeProxy() throws IOException { + if (cachedActiveProxy == null) { + Configuration conf = namesystem.getConf(); + NamenodeProtocolPB proxy = + RPC.waitForProxy(NamenodeProtocolPB.class, + RPC.getProtocolVersion(NamenodeProtocolPB.class), activeAddr, conf); + cachedActiveProxy = new NamenodeProtocolTranslatorPB(proxy); + } + assert cachedActiveProxy != null; + return cachedActiveProxy; + } + public void start() { tailerThread.start(); } @@ -71,16 +158,6 @@ public void stop() throws IOException { throw new IOException(e); } } - - @VisibleForTesting - public void setSleepTime(long sleepTime) { - tailerThread.setSleepTime(sleepTime); - } - - @VisibleForTesting - public void interrupt() { - tailerThread.interrupt(); - } @VisibleForTesting FSEditLog getEditLog() { @@ -152,18 +229,43 @@ private void doTailEdits() throws IOException, InterruptedException { editsLoaded, lastTxnId)); } } + + if (editsLoaded > 0) { + lastLoadTimestamp = now(); + } + lastLoadedTxnId = image.getLastAppliedTxId(); } finally { namesystem.writeUnlock(); } } + /** + * @return true if the configured log roll period has elapsed. + */ + private boolean tooLongSinceLastLoad() { + return logRollPeriodMs >= 0 && + (now() - lastLoadTimestamp) > logRollPeriodMs ; + } + + /** + * Trigger the active node to roll its logs. + */ + private void triggerActiveLogRoll() { + LOG.info("Triggering log roll on remote NameNode " + activeAddr); + try { + getActiveNodeProxy().rollEditLog(); + lastRollTriggerTxId = lastLoadedTxnId; + } catch (IOException ioe) { + LOG.warn("Unable to trigger a roll of the active NN", ioe); + } + } + /** * The thread which does the actual work of tailing edits journals and * applying the transactions to the FSNS. */ private class EditLogTailerThread extends Thread { private volatile boolean shouldRun = true; - private long sleepTime = 60 * 1000; private EditLogTailerThread() { super("Edit log tailer"); @@ -173,14 +275,26 @@ private void setShouldRun(boolean shouldRun) { this.shouldRun = shouldRun; } - private void setSleepTime(long sleepTime) { - this.sleepTime = sleepTime; - } - @Override public void run() { while (shouldRun) { try { + // There's no point in triggering a log roll if the Standby hasn't + // read any more transactions since the last time a roll was + // triggered. + if (tooLongSinceLastLoad() && + lastRollTriggerTxId < lastLoadedTxnId) { + triggerActiveLogRoll(); + } + /** + * Check again in case someone calls {@link EditLogTailer#stop} while + * we're triggering an edit log roll, since ipc.Client catches and + * ignores {@link InterruptedException} in a few places. This fixes + * the bug described in HDFS-2823. + */ + if (!shouldRun) { + break; + } doTailEdits(); } catch (EditLogInputException elie) { LOG.warn("Error while reading edits from disk. Will try again.", elie); @@ -194,7 +308,7 @@ public void run() { } try { - Thread.sleep(sleepTime); + Thread.sleep(sleepTimeMs); } catch (InterruptedException e) { LOG.warn("Edit log tailer interrupted", e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java index ee7921db4f..83e85f7709 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java @@ -20,20 +20,17 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.Collection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ServiceFailedException; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.server.namenode.CheckpointConf; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.net.NetUtils; @@ -41,9 +38,7 @@ import static org.apache.hadoop.hdfs.server.common.Util.now; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; /** * Thread which runs inside the NN when it's in Standby state, @@ -79,37 +74,19 @@ public StandbyCheckpointer(Configuration conf, FSNamesystem ns) { * as well as our own HTTP address from the configuration. */ private void setNameNodeAddresses(Configuration conf) { - String nsId = DFSUtil.getNamenodeNameServiceId(conf); - Collection nnIds = DFSUtil.getNameNodeIds(conf, nsId); - String myNNId = conf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY); - Preconditions.checkArgument(nnIds != null, - "Could not determine namenode ids in namespace '%s'", - nsId); - Preconditions.checkArgument(nnIds.size() == 2, - "Expected exactly 2 NameNodes in this namespace. Instead, got: '%s'", - Joiner.on("','").join(nnIds)); - Preconditions.checkState(myNNId != null && !myNNId.isEmpty(), - "Could not determine own NN ID"); - - ArrayList nnSet = Lists.newArrayList(nnIds); - nnSet.remove(myNNId); - assert nnSet.size() == 1; - String activeNN = nnSet.get(0); - - // Look up the address of the active NN. - Configuration confForActive = new Configuration(conf); - NameNode.initializeGenericKeys(confForActive, nsId, activeNN); - activeNNAddress = DFSUtil.getInfoServer(null, confForActive, true); - // Look up our own address. String myAddrString = DFSUtil.getInfoServer(null, conf, true); + + // Look up the active node's address + Configuration confForActive = HAUtil.getConfForOtherNode(conf); + activeNNAddress = DFSUtil.getInfoServer(null, confForActive, true); + // Sanity-check. Preconditions.checkArgument(checkAddress(activeNNAddress), "Bad address for active NN: %s", activeNNAddress); - Preconditions.checkArgument(checkAddress(activeNNAddress), - "Bad address for standby NN: %s", myNNAddress); - + Preconditions.checkArgument(checkAddress(myAddrString), + "Bad address for standby NN: %s", myAddrString); myNNAddress = NetUtils.createSocketAddr(myAddrString); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java index 48de14c657..de04b33cb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java @@ -100,10 +100,7 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) * call fails if the file system is in SafeMode. * @throws IOException * @return a unique token to identify this transaction. - * @deprecated - * See {@link org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode} */ - @Deprecated public CheckpointSignature rollEditLog() throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 0357c5d714..977ee956cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -548,6 +548,12 @@ private void initMiniDFSCluster( "since no HTTP ports have been specified."); conf.setBoolean(DFS_HA_STANDBY_CHECKPOINTS_KEY, false); } + if (!nnTopology.allIpcPortsSpecified() && + nnTopology.isHA()) { + LOG.info("MiniDFSCluster disabling log-roll triggering in the " + + "Standby node since no IPC ports have been specified."); + conf.setInt(DFS_HA_LOGROLL_PERIOD_KEY, -1); + } federation = nnTopology.isFederated(); createNameNodesAndSetConf( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java index fc9bb64f9e..c8e22e3b45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java @@ -134,6 +134,21 @@ public boolean allHttpPortsSpecified() { } return true; } + + /** + * @return true if all of the NNs in the cluster have their IPC + * port specified to be non-ephemeral. + */ + public boolean allIpcPortsSpecified() { + for (NSConf ns : nameservices) { + for (NNConf nn : ns.getNNs()) { + if (nn.getIpcPort() == 0) { + return false; + } + } + } + return true; + } public List getNameservices() { return nameservices; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java index 9a2149a281..1f43e057f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java @@ -94,6 +94,7 @@ public void setupCluster() throws Exception { // See RandomDeleterPolicy javadoc. conf.setClass("dfs.block.replicator.classname", RandomDeleterPolicy.class, BlockPlacementPolicy.class); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(3) @@ -106,8 +107,6 @@ public void setupCluster() throws Exception { // Trigger block reports so that the first NN trusts all // of the DNs, and will issue deletions cluster.triggerBlockReports(); - nn2.getNamesystem().getEditLogTailer().setSleepTime(250); - nn2.getNamesystem().getEditLogTailer().interrupt(); fs = HATestUtil.configureFailoverFs(cluster, conf); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java index 8fc9d49eb0..44bc01d1cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java @@ -115,9 +115,11 @@ public void testFencingStress() throws Exception { conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); // Increase max streams so that we re-replicate quickly. conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(3) @@ -128,8 +130,6 @@ public void testFencingStress() throws Exception { final NameNode nn1 = cluster.getNameNode(0); final NameNode nn2 = cluster.getNameNode(1); - nn2.getNamesystem().getEditLogTailer().setSleepTime(250); - nn2.getNamesystem().getEditLogTailer().interrupt(); FileSystem fs = HATestUtil.configureFailoverFs( cluster, conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index 7c3e38b18a..1f5822ee57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -19,23 +19,32 @@ import static org.junit.Assert.assertTrue; +import java.io.File; import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.server.namenode.FSImage; +import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.Test; +import com.google.common.base.Supplier; + public class TestEditLogTailer { private static final String DIR_PREFIX = "/dir"; @@ -52,6 +61,8 @@ public class TestEditLogTailer { public void testTailer() throws IOException, InterruptedException, ServiceFailedException { Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + HAUtil.setAllowStandbyReads(conf, true); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) @@ -64,8 +75,6 @@ public void testTailer() throws IOException, InterruptedException, NameNode nn1 = cluster.getNameNode(0); NameNode nn2 = cluster.getNameNode(1); - nn2.getNamesystem().getEditLogTailer().setSleepTime(250); - nn2.getNamesystem().getEditLogTailer().interrupt(); try { for (int i = 0; i < DIRS_TO_MAKE / 2; i++) { NameNodeAdapter.mkdirs(nn1, getDirPath(i), @@ -97,7 +106,57 @@ public void testTailer() throws IOException, InterruptedException, } } + @Test + public void testNN0TriggersLogRolls() throws Exception { + testStandbyTriggersLogRolls(0); + } + + @Test + public void testNN1TriggersLogRolls() throws Exception { + testStandbyTriggersLogRolls(1); + } + + private static void testStandbyTriggersLogRolls(int activeIndex) + throws Exception { + Configuration conf = new Configuration(); + // Roll every 1s + conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + + // Have to specify IPC ports so the NNs can talk to each other. + MiniDFSNNTopology topology = new MiniDFSNNTopology() + .addNameservice(new MiniDFSNNTopology.NSConf(null) + .addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10001)) + .addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10002))); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(topology) + .numDataNodes(0) + .build(); + try { + cluster.transitionToActive(activeIndex); + waitForLogRollInSharedDir(cluster, 3); + } finally { + cluster.shutdown(); + } + } + private static String getDirPath(int suffix) { return DIR_PREFIX + suffix; } + + private static void waitForLogRollInSharedDir(MiniDFSCluster cluster, + long startTxId) throws Exception { + URI sharedUri = cluster.getSharedEditsDir(0, 1); + File sharedDir = new File(sharedUri.getPath(), "current"); + final File expectedLog = new File(sharedDir, + NNStorage.getInProgressEditsFileName(startTxId)); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return expectedLog.exists(); + } + }, 100, 10000); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java index 24b2c0866e..e41a7a6e51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java @@ -78,6 +78,7 @@ public void setUpCluster() throws Exception { conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, 10); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); HAUtil.setAllowStandbyReads(conf, true); MiniDFSNNTopology topology = new MiniDFSNNTopology() @@ -93,8 +94,6 @@ public void setUpCluster() throws Exception { nn0 = cluster.getNameNode(0); nn1 = cluster.getNameNode(1); - nn1.getNamesystem().getEditLogTailer().setSleepTime(250); - nn1.getNamesystem().getEditLogTailer().interrupt(); nn1.getNamesystem().getEditLogTailer().setRuntime(mockRuntime); cluster.transitionToActive(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java index af7985e21d..0703f8c8a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java @@ -90,10 +90,11 @@ private void restartStandby() throws IOException { // have been achieved, without being racy. cluster.getConfiguration(1).setInt( DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 30000); + cluster.getConfiguration(1).setInt( + DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + cluster.restartNameNode(1); nn1 = cluster.getNameNode(1); - nn1.getNamesystem().getEditLogTailer().setSleepTime(250); - nn1.getNamesystem().getEditLogTailer().interrupt(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java index 52e21c8602..fbeaa30a93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; @@ -207,6 +208,7 @@ public void doAnAction() throws Exception { @Test(timeout=120000) public void testLeasesRenewedOnTransition() throws Exception { Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(1) @@ -215,8 +217,6 @@ public void testLeasesRenewedOnTransition() throws Exception { FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); NameNode nn0 = cluster.getNameNode(0); NameNode nn1 = cluster.getNameNode(1); - nn1.getNamesystem().getEditLogTailer().setSleepTime(250); - nn1.getNamesystem().getEditLogTailer().interrupt(); try { cluster.waitActive(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java index 83f077c55d..2c0c81947c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java @@ -57,7 +57,8 @@ public void setupCluster() throws Exception { Configuration conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5); - + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + MiniDFSNNTopology topology = new MiniDFSNNTopology() .addNameservice(new MiniDFSNNTopology.NSConf(null) .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001)) @@ -73,9 +74,6 @@ public void setupCluster() throws Exception { nn1 = cluster.getNameNode(1); fs = HATestUtil.configureFailoverFs(cluster, conf); - nn1.getNamesystem().getEditLogTailer().setSleepTime(250); - nn1.getNamesystem().getEditLogTailer().interrupt(); - cluster.transitionToActive(0); } @@ -150,8 +148,6 @@ public void testCheckpointWhenNoNewTransactionsHappened() DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0); cluster.restartNameNode(1); nn1 = cluster.getNameNode(1); - nn1.getNamesystem().getEditLogTailer().setSleepTime(250); - nn1.getNamesystem().getEditLogTailer().interrupt(); FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1); @@ -195,8 +191,6 @@ public void testCheckpointCancellation() throws Exception { DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0); cluster.restartNameNode(1); nn1 = cluster.getNameNode(1); - nn1.getNamesystem().getEditLogTailer().setSleepTime(250); - nn1.getNamesystem().getEditLogTailer().interrupt(); cluster.transitionToActive(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java index 7bb8d814d2..ce5814b0dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java @@ -17,14 +17,13 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; -import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,13 +31,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; @@ -77,6 +75,7 @@ public void testStandbyIsHot() throws Exception { Configuration conf = new Configuration(); // We read from the standby to watch block locations HAUtil.setAllowStandbyReads(conf, true); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(3) @@ -90,8 +89,6 @@ public void testStandbyIsHot() throws Exception { NameNode nn2 = cluster.getNameNode(1); nn2.getNamesystem().getEditLogTailer().setRuntime(mockRuntime); - nn2.getNamesystem().getEditLogTailer().setSleepTime(250); - nn2.getNamesystem().getEditLogTailer().interrupt(); FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); @@ -151,6 +148,7 @@ public void testDatanodeRestarts() throws Exception { conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); // We read from the standby to watch block locations HAUtil.setAllowStandbyReads(conf, true); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(1) @@ -158,8 +156,6 @@ public void testDatanodeRestarts() throws Exception { try { NameNode nn0 = cluster.getNameNode(0); NameNode nn1 = cluster.getNameNode(1); - nn1.getNamesystem().getEditLogTailer().setSleepTime(250); - nn1.getNamesystem().getEditLogTailer().interrupt(); cluster.transitionToActive(0);