HDFS-2737. Automatically trigger log rolls periodically on the active NN. Contributed by Todd Lipcon and Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1234256 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-01-21 03:17:26 +00:00
parent 2abe5c818d
commit c3e62de9ce
21 changed files with 283 additions and 86 deletions

View File

@ -119,3 +119,5 @@ HDFS-2592. Balancer support for HA namenodes. (Uma Maheswara Rao G via todd)
HDFS-2367. Enable the configuration of multiple HA cluster addresses. (atm) HDFS-2367. Enable the configuration of multiple HA cluster addresses. (atm)
HDFS-2812. When becoming active, the NN should treat all leases as freshly renewed. (todd) HDFS-2812. When becoming active, the NN should treat all leases as freshly renewed. (todd)
HDFS-2737. Automatically trigger log rolls periodically on the active NN. (todd and atm)

View File

@ -333,4 +333,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_HA_NAMENODE_ID_KEY = "dfs.ha.namenode.id"; public static final String DFS_HA_NAMENODE_ID_KEY = "dfs.ha.namenode.id";
public static final String DFS_HA_STANDBY_CHECKPOINTS_KEY = "dfs.ha.standby.checkpoints"; public static final String DFS_HA_STANDBY_CHECKPOINTS_KEY = "dfs.ha.standby.checkpoints";
public static final boolean DFS_HA_STANDBY_CHECKPOINTS_DEFAULT = true; public static final boolean DFS_HA_STANDBY_CHECKPOINTS_DEFAULT = true;
public static final String DFS_HA_LOGROLL_PERIOD_KEY = "dfs.ha.log-roll.period";
public static final int DFS_HA_LOGROLL_PERIOD_DEFAULT = 2 * 60; // 2m
public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period";
public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m
} }

View File

@ -22,6 +22,8 @@
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
@ -32,9 +34,9 @@
import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.ReflectionUtils; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public class HAUtil { public class HAUtil {
private HAUtil() { /* Hidden constructor */ } private HAUtil() { /* Hidden constructor */ }
@ -109,6 +111,39 @@ public static String getNameNodeIdFromAddress(final Configuration conf,
return null; return null;
} }
/**
* Given the configuration for this node, return a Configuration object for
* the other node in an HA setup.
*
* @param myConf the configuration of this node
* @return the configuration of the other node in an HA setup
*/
public static Configuration getConfForOtherNode(
Configuration myConf) {
String nsId = DFSUtil.getNamenodeNameServiceId(myConf);
Collection<String> nnIds = DFSUtil.getNameNodeIds(myConf, nsId);
String myNNId = myConf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY);
Preconditions.checkArgument(nnIds != null,
"Could not determine namenode ids in namespace '%s'",
nsId);
Preconditions.checkArgument(nnIds.size() == 2,
"Expected exactly 2 NameNodes in this namespace. Instead, got: '%s'",
Joiner.on("','").join(nnIds));
Preconditions.checkState(myNNId != null && !myNNId.isEmpty(),
"Could not determine own NN ID");
ArrayList<String> nnSet = Lists.newArrayList(nnIds);
nnSet.remove(myNNId);
assert nnSet.size() == 1;
String activeNN = nnSet.get(0);
// Look up the address of the active NN.
Configuration confForOtherNode = new Configuration(myConf);
NameNode.initializeGenericKeys(confForOtherNode, nsId, activeNN);
return confForOtherNode;
}
/** /**
* This is used only by tests at the moment. * This is used only by tests at the moment.
* @return true if the NN should allow read operations while in standby mode. * @return true if the NN should allow read operations while in standby mode.

View File

@ -157,7 +157,6 @@ public long getTransactionID() throws IOException {
} }
@Override @Override
@SuppressWarnings("deprecation")
public CheckpointSignature rollEditLog() throws IOException { public CheckpointSignature rollEditLog() throws IOException {
try { try {
return PBHelper.convert(rpcProxy.rollEditLog(NULL_CONTROLLER, return PBHelper.convert(rpcProxy.rollEditLog(NULL_CONTROLLER,

View File

@ -134,7 +134,6 @@ public long getTransactionID() throws IOException {
} }
@Override @Override
@SuppressWarnings("deprecation")
public CheckpointSignature rollEditLog() throws IOException { public CheckpointSignature rollEditLog() throws IOException {
return rpcProxy.rollEditLog().convert(); return rpcProxy.rollEditLog().convert();
} }

View File

@ -84,10 +84,7 @@ public BlocksWithLocationsWritable getBlocks(DatanodeInfoWritable datanode,
* call fails if the file system is in SafeMode. * call fails if the file system is in SafeMode.
* @throws IOException * @throws IOException
* @return a unique token to identify this transaction. * @return a unique token to identify this transaction.
* @deprecated
* See {@link org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode}
*/ */
@Deprecated
public CheckpointSignatureWritable rollEditLog() throws IOException; public CheckpointSignatureWritable rollEditLog() throws IOException;
/** /**

View File

@ -690,6 +690,10 @@ public static Collection<URI> getSharedEditsDirs(Configuration conf) {
DFS_NAMENODE_SHARED_EDITS_DIR_KEY); DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
return Util.stringCollectionAsURIs(dirNames); return Util.stringCollectionAsURIs(dirNames);
} }
public Configuration getConf() {
return conf;
}
@Override @Override
public void readLock() { public void readLock() {
@ -3846,6 +3850,7 @@ String getSafeModeTip() {
CheckpointSignature rollEditLog() throws IOException { CheckpointSignature rollEditLog() throws IOException {
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.JOURNAL);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Log not rolled", safeMode); throw new SafeModeException("Log not rolled", safeMode);
} }

View File

@ -738,7 +738,6 @@ public long getTransactionID() {
@Override // NamenodeProtocol @Override // NamenodeProtocol
public CheckpointSignature rollEditLog() throws IOException { public CheckpointSignature rollEditLog() throws IOException {
// TODO:HA decide on OperationCategory for this
return namesystem.rollEditLog(); return namesystem.rollEditLog();
} }

View File

@ -19,21 +19,34 @@
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputException; import org.apache.hadoop.hdfs.server.namenode.EditLogInputException;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.ipc.RPC;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
/** /**
* EditLogTailer represents a thread which periodically reads from edits * EditLogTailer represents a thread which periodically reads from edits
* journals and applies the transactions contained within to a given * journals and applies the transactions contained within to a given
@ -50,13 +63,87 @@ public class EditLogTailer {
private FSEditLog editLog; private FSEditLog editLog;
private volatile Runtime runtime = Runtime.getRuntime(); private volatile Runtime runtime = Runtime.getRuntime();
private InetSocketAddress activeAddr;
private NamenodeProtocol cachedActiveProxy = null;
/**
* The last transaction ID at which an edit log roll was initiated.
*/
private long lastRollTriggerTxId = HdfsConstants.INVALID_TXID;
/**
* The highest transaction ID loaded by the Standby.
*/
private long lastLoadedTxnId = HdfsConstants.INVALID_TXID;
/**
* The last time we successfully loaded a non-zero number of edits from the
* shared directory.
*/
private long lastLoadTimestamp;
/**
* How often the Standby should roll edit logs. Since the Standby only reads
* from finalized log segments, the Standby will only be as up-to-date as how
* often the logs are rolled.
*/
private long logRollPeriodMs;
/**
* How often the Standby should check if there are new finalized segment(s)
* available to be read from.
*/
private long sleepTimeMs;
public EditLogTailer(FSNamesystem namesystem) { public EditLogTailer(FSNamesystem namesystem) {
this.tailerThread = new EditLogTailerThread(); this.tailerThread = new EditLogTailerThread();
this.namesystem = namesystem; this.namesystem = namesystem;
this.editLog = namesystem.getEditLog(); this.editLog = namesystem.getEditLog();
Configuration conf = namesystem.getConf();
lastLoadTimestamp = now();
logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT) * 1000;
if (logRollPeriodMs >= 0) {
this.activeAddr = getActiveNodeAddress();
Preconditions.checkArgument(activeAddr.getPort() > 0,
"Active NameNode must have an IPC port configured. " +
"Got address '%s'", activeAddr);
LOG.info("Will roll logs on active node at " + activeAddr + " every " +
(logRollPeriodMs / 1000) + " seconds.");
} else {
LOG.info("Not going to trigger log rolls on active node because " +
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY + " is negative.");
}
sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000;
LOG.debug("logRollPeriodMs=" + logRollPeriodMs +
" sleepTime=" + sleepTimeMs);
} }
private InetSocketAddress getActiveNodeAddress() {
Configuration conf = namesystem.getConf();
Configuration activeConf = HAUtil.getConfForOtherNode(conf);
return NameNode.getServiceAddress(activeConf, true);
}
private NamenodeProtocol getActiveNodeProxy() throws IOException {
if (cachedActiveProxy == null) {
Configuration conf = namesystem.getConf();
NamenodeProtocolPB proxy =
RPC.waitForProxy(NamenodeProtocolPB.class,
RPC.getProtocolVersion(NamenodeProtocolPB.class), activeAddr, conf);
cachedActiveProxy = new NamenodeProtocolTranslatorPB(proxy);
}
assert cachedActiveProxy != null;
return cachedActiveProxy;
}
public void start() { public void start() {
tailerThread.start(); tailerThread.start();
} }
@ -71,16 +158,6 @@ public void stop() throws IOException {
throw new IOException(e); throw new IOException(e);
} }
} }
@VisibleForTesting
public void setSleepTime(long sleepTime) {
tailerThread.setSleepTime(sleepTime);
}
@VisibleForTesting
public void interrupt() {
tailerThread.interrupt();
}
@VisibleForTesting @VisibleForTesting
FSEditLog getEditLog() { FSEditLog getEditLog() {
@ -152,18 +229,43 @@ private void doTailEdits() throws IOException, InterruptedException {
editsLoaded, lastTxnId)); editsLoaded, lastTxnId));
} }
} }
if (editsLoaded > 0) {
lastLoadTimestamp = now();
}
lastLoadedTxnId = image.getLastAppliedTxId();
} finally { } finally {
namesystem.writeUnlock(); namesystem.writeUnlock();
} }
} }
/**
* @return true if the configured log roll period has elapsed.
*/
private boolean tooLongSinceLastLoad() {
return logRollPeriodMs >= 0 &&
(now() - lastLoadTimestamp) > logRollPeriodMs ;
}
/**
* Trigger the active node to roll its logs.
*/
private void triggerActiveLogRoll() {
LOG.info("Triggering log roll on remote NameNode " + activeAddr);
try {
getActiveNodeProxy().rollEditLog();
lastRollTriggerTxId = lastLoadedTxnId;
} catch (IOException ioe) {
LOG.warn("Unable to trigger a roll of the active NN", ioe);
}
}
/** /**
* The thread which does the actual work of tailing edits journals and * The thread which does the actual work of tailing edits journals and
* applying the transactions to the FSNS. * applying the transactions to the FSNS.
*/ */
private class EditLogTailerThread extends Thread { private class EditLogTailerThread extends Thread {
private volatile boolean shouldRun = true; private volatile boolean shouldRun = true;
private long sleepTime = 60 * 1000;
private EditLogTailerThread() { private EditLogTailerThread() {
super("Edit log tailer"); super("Edit log tailer");
@ -173,14 +275,26 @@ private void setShouldRun(boolean shouldRun) {
this.shouldRun = shouldRun; this.shouldRun = shouldRun;
} }
private void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override @Override
public void run() { public void run() {
while (shouldRun) { while (shouldRun) {
try { try {
// There's no point in triggering a log roll if the Standby hasn't
// read any more transactions since the last time a roll was
// triggered.
if (tooLongSinceLastLoad() &&
lastRollTriggerTxId < lastLoadedTxnId) {
triggerActiveLogRoll();
}
/**
* Check again in case someone calls {@link EditLogTailer#stop} while
* we're triggering an edit log roll, since ipc.Client catches and
* ignores {@link InterruptedException} in a few places. This fixes
* the bug described in HDFS-2823.
*/
if (!shouldRun) {
break;
}
doTailEdits(); doTailEdits();
} catch (EditLogInputException elie) { } catch (EditLogInputException elie) {
LOG.warn("Error while reading edits from disk. Will try again.", elie); LOG.warn("Error while reading edits from disk. Will try again.", elie);
@ -194,7 +308,7 @@ public void run() {
} }
try { try {
Thread.sleep(sleepTime); Thread.sleep(sleepTimeMs);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Edit log tailer interrupted", e); LOG.warn("Edit log tailer interrupted", e);
} }

View File

@ -20,20 +20,17 @@
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.server.namenode.CheckpointConf; import org.apache.hadoop.hdfs.server.namenode.CheckpointConf;
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException; import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -41,9 +38,7 @@
import static org.apache.hadoop.hdfs.server.common.Util.now; import static org.apache.hadoop.hdfs.server.common.Util.now;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/** /**
* Thread which runs inside the NN when it's in Standby state, * Thread which runs inside the NN when it's in Standby state,
@ -79,37 +74,19 @@ public StandbyCheckpointer(Configuration conf, FSNamesystem ns) {
* as well as our own HTTP address from the configuration. * as well as our own HTTP address from the configuration.
*/ */
private void setNameNodeAddresses(Configuration conf) { private void setNameNodeAddresses(Configuration conf) {
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
Collection<String> nnIds = DFSUtil.getNameNodeIds(conf, nsId);
String myNNId = conf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY);
Preconditions.checkArgument(nnIds != null,
"Could not determine namenode ids in namespace '%s'",
nsId);
Preconditions.checkArgument(nnIds.size() == 2,
"Expected exactly 2 NameNodes in this namespace. Instead, got: '%s'",
Joiner.on("','").join(nnIds));
Preconditions.checkState(myNNId != null && !myNNId.isEmpty(),
"Could not determine own NN ID");
ArrayList<String> nnSet = Lists.newArrayList(nnIds);
nnSet.remove(myNNId);
assert nnSet.size() == 1;
String activeNN = nnSet.get(0);
// Look up the address of the active NN.
Configuration confForActive = new Configuration(conf);
NameNode.initializeGenericKeys(confForActive, nsId, activeNN);
activeNNAddress = DFSUtil.getInfoServer(null, confForActive, true);
// Look up our own address. // Look up our own address.
String myAddrString = DFSUtil.getInfoServer(null, conf, true); String myAddrString = DFSUtil.getInfoServer(null, conf, true);
// Look up the active node's address
Configuration confForActive = HAUtil.getConfForOtherNode(conf);
activeNNAddress = DFSUtil.getInfoServer(null, confForActive, true);
// Sanity-check. // Sanity-check.
Preconditions.checkArgument(checkAddress(activeNNAddress), Preconditions.checkArgument(checkAddress(activeNNAddress),
"Bad address for active NN: %s", activeNNAddress); "Bad address for active NN: %s", activeNNAddress);
Preconditions.checkArgument(checkAddress(activeNNAddress), Preconditions.checkArgument(checkAddress(myAddrString),
"Bad address for standby NN: %s", myNNAddress); "Bad address for standby NN: %s", myAddrString);
myNNAddress = NetUtils.createSocketAddr(myAddrString); myNNAddress = NetUtils.createSocketAddr(myAddrString);
} }

View File

@ -100,10 +100,7 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
* call fails if the file system is in SafeMode. * call fails if the file system is in SafeMode.
* @throws IOException * @throws IOException
* @return a unique token to identify this transaction. * @return a unique token to identify this transaction.
* @deprecated
* See {@link org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode}
*/ */
@Deprecated
public CheckpointSignature rollEditLog() throws IOException; public CheckpointSignature rollEditLog() throws IOException;
/** /**

View File

@ -548,6 +548,12 @@ private void initMiniDFSCluster(
"since no HTTP ports have been specified."); "since no HTTP ports have been specified.");
conf.setBoolean(DFS_HA_STANDBY_CHECKPOINTS_KEY, false); conf.setBoolean(DFS_HA_STANDBY_CHECKPOINTS_KEY, false);
} }
if (!nnTopology.allIpcPortsSpecified() &&
nnTopology.isHA()) {
LOG.info("MiniDFSCluster disabling log-roll triggering in the "
+ "Standby node since no IPC ports have been specified.");
conf.setInt(DFS_HA_LOGROLL_PERIOD_KEY, -1);
}
federation = nnTopology.isFederated(); federation = nnTopology.isFederated();
createNameNodesAndSetConf( createNameNodesAndSetConf(

View File

@ -134,6 +134,21 @@ public boolean allHttpPortsSpecified() {
} }
return true; return true;
} }
/**
* @return true if all of the NNs in the cluster have their IPC
* port specified to be non-ephemeral.
*/
public boolean allIpcPortsSpecified() {
for (NSConf ns : nameservices) {
for (NNConf nn : ns.getNNs()) {
if (nn.getIpcPort() == 0) {
return false;
}
}
}
return true;
}
public List<NSConf> getNameservices() { public List<NSConf> getNameservices() {
return nameservices; return nameservices;

View File

@ -94,6 +94,7 @@ public void setupCluster() throws Exception {
// See RandomDeleterPolicy javadoc. // See RandomDeleterPolicy javadoc.
conf.setClass("dfs.block.replicator.classname", RandomDeleterPolicy.class, conf.setClass("dfs.block.replicator.classname", RandomDeleterPolicy.class,
BlockPlacementPolicy.class); BlockPlacementPolicy.class);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology()) .nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3) .numDataNodes(3)
@ -106,8 +107,6 @@ public void setupCluster() throws Exception {
// Trigger block reports so that the first NN trusts all // Trigger block reports so that the first NN trusts all
// of the DNs, and will issue deletions // of the DNs, and will issue deletions
cluster.triggerBlockReports(); cluster.triggerBlockReports();
nn2.getNamesystem().getEditLogTailer().setSleepTime(250);
nn2.getNamesystem().getEditLogTailer().interrupt();
fs = HATestUtil.configureFailoverFs(cluster, conf); fs = HATestUtil.configureFailoverFs(cluster, conf);
} }

View File

@ -115,9 +115,11 @@ public void testFencingStress() throws Exception {
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000); conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
// Increase max streams so that we re-replicate quickly. // Increase max streams so that we re-replicate quickly.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology()) .nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3) .numDataNodes(3)
@ -128,8 +130,6 @@ public void testFencingStress() throws Exception {
final NameNode nn1 = cluster.getNameNode(0); final NameNode nn1 = cluster.getNameNode(0);
final NameNode nn2 = cluster.getNameNode(1); final NameNode nn2 = cluster.getNameNode(1);
nn2.getNamesystem().getEditLogTailer().setSleepTime(250);
nn2.getNamesystem().getEditLogTailer().interrupt();
FileSystem fs = HATestUtil.configureFailoverFs( FileSystem fs = HATestUtil.configureFailoverFs(
cluster, conf); cluster, conf);

View File

@ -19,23 +19,32 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Supplier;
public class TestEditLogTailer { public class TestEditLogTailer {
private static final String DIR_PREFIX = "/dir"; private static final String DIR_PREFIX = "/dir";
@ -52,6 +61,8 @@ public class TestEditLogTailer {
public void testTailer() throws IOException, InterruptedException, public void testTailer() throws IOException, InterruptedException,
ServiceFailedException { ServiceFailedException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
HAUtil.setAllowStandbyReads(conf, true); HAUtil.setAllowStandbyReads(conf, true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
@ -64,8 +75,6 @@ public void testTailer() throws IOException, InterruptedException,
NameNode nn1 = cluster.getNameNode(0); NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1); NameNode nn2 = cluster.getNameNode(1);
nn2.getNamesystem().getEditLogTailer().setSleepTime(250);
nn2.getNamesystem().getEditLogTailer().interrupt();
try { try {
for (int i = 0; i < DIRS_TO_MAKE / 2; i++) { for (int i = 0; i < DIRS_TO_MAKE / 2; i++) {
NameNodeAdapter.mkdirs(nn1, getDirPath(i), NameNodeAdapter.mkdirs(nn1, getDirPath(i),
@ -97,7 +106,57 @@ public void testTailer() throws IOException, InterruptedException,
} }
} }
@Test
public void testNN0TriggersLogRolls() throws Exception {
testStandbyTriggersLogRolls(0);
}
@Test
public void testNN1TriggersLogRolls() throws Exception {
testStandbyTriggersLogRolls(1);
}
private static void testStandbyTriggersLogRolls(int activeIndex)
throws Exception {
Configuration conf = new Configuration();
// Roll every 1s
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
// Have to specify IPC ports so the NNs can talk to each other.
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf(null)
.addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10001))
.addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10002)));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.build();
try {
cluster.transitionToActive(activeIndex);
waitForLogRollInSharedDir(cluster, 3);
} finally {
cluster.shutdown();
}
}
private static String getDirPath(int suffix) { private static String getDirPath(int suffix) {
return DIR_PREFIX + suffix; return DIR_PREFIX + suffix;
} }
private static void waitForLogRollInSharedDir(MiniDFSCluster cluster,
long startTxId) throws Exception {
URI sharedUri = cluster.getSharedEditsDir(0, 1);
File sharedDir = new File(sharedUri.getPath(), "current");
final File expectedLog = new File(sharedDir,
NNStorage.getInProgressEditsFileName(startTxId));
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return expectedLog.exists();
}
}, 100, 10000);
}
} }

View File

@ -78,6 +78,7 @@ public void setUpCluster() throws Exception {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, 10); conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, 10);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
HAUtil.setAllowStandbyReads(conf, true); HAUtil.setAllowStandbyReads(conf, true);
MiniDFSNNTopology topology = new MiniDFSNNTopology() MiniDFSNNTopology topology = new MiniDFSNNTopology()
@ -93,8 +94,6 @@ public void setUpCluster() throws Exception {
nn0 = cluster.getNameNode(0); nn0 = cluster.getNameNode(0);
nn1 = cluster.getNameNode(1); nn1 = cluster.getNameNode(1);
nn1.getNamesystem().getEditLogTailer().setSleepTime(250);
nn1.getNamesystem().getEditLogTailer().interrupt();
nn1.getNamesystem().getEditLogTailer().setRuntime(mockRuntime); nn1.getNamesystem().getEditLogTailer().setRuntime(mockRuntime);
cluster.transitionToActive(0); cluster.transitionToActive(0);

View File

@ -90,10 +90,11 @@ private void restartStandby() throws IOException {
// have been achieved, without being racy. // have been achieved, without being racy.
cluster.getConfiguration(1).setInt( cluster.getConfiguration(1).setInt(
DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 30000); DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 30000);
cluster.getConfiguration(1).setInt(
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
cluster.restartNameNode(1); cluster.restartNameNode(1);
nn1 = cluster.getNameNode(1); nn1 = cluster.getNameNode(1);
nn1.getNamesystem().getEditLogTailer().setSleepTime(250);
nn1.getNamesystem().getEditLogTailer().interrupt();
} }
/** /**

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
@ -207,6 +208,7 @@ public void doAnAction() throws Exception {
@Test(timeout=120000) @Test(timeout=120000)
public void testLeasesRenewedOnTransition() throws Exception { public void testLeasesRenewedOnTransition() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology()) .nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(1) .numDataNodes(1)
@ -215,8 +217,6 @@ public void testLeasesRenewedOnTransition() throws Exception {
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
NameNode nn0 = cluster.getNameNode(0); NameNode nn0 = cluster.getNameNode(0);
NameNode nn1 = cluster.getNameNode(1); NameNode nn1 = cluster.getNameNode(1);
nn1.getNamesystem().getEditLogTailer().setSleepTime(250);
nn1.getNamesystem().getEditLogTailer().interrupt();
try { try {
cluster.waitActive(); cluster.waitActive();

View File

@ -57,7 +57,8 @@ public void setupCluster() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
MiniDFSNNTopology topology = new MiniDFSNNTopology() MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf(null) .addNameservice(new MiniDFSNNTopology.NSConf(null)
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001)) .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001))
@ -73,9 +74,6 @@ public void setupCluster() throws Exception {
nn1 = cluster.getNameNode(1); nn1 = cluster.getNameNode(1);
fs = HATestUtil.configureFailoverFs(cluster, conf); fs = HATestUtil.configureFailoverFs(cluster, conf);
nn1.getNamesystem().getEditLogTailer().setSleepTime(250);
nn1.getNamesystem().getEditLogTailer().interrupt();
cluster.transitionToActive(0); cluster.transitionToActive(0);
} }
@ -150,8 +148,6 @@ public void testCheckpointWhenNoNewTransactionsHappened()
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0); DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0);
cluster.restartNameNode(1); cluster.restartNameNode(1);
nn1 = cluster.getNameNode(1); nn1 = cluster.getNameNode(1);
nn1.getNamesystem().getEditLogTailer().setSleepTime(250);
nn1.getNamesystem().getEditLogTailer().interrupt();
FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1); FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
@ -195,8 +191,6 @@ public void testCheckpointCancellation() throws Exception {
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0); DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0);
cluster.restartNameNode(1); cluster.restartNameNode(1);
nn1 = cluster.getNameNode(1); nn1 = cluster.getNameNode(1);
nn1.getNamesystem().getEditLogTailer().setSleepTime(250);
nn1.getNamesystem().getEditLogTailer().interrupt();
cluster.transitionToActive(0); cluster.transitionToActive(0);

View File

@ -17,14 +17,13 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -32,13 +31,12 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@ -77,6 +75,7 @@ public void testStandbyIsHot() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
// We read from the standby to watch block locations // We read from the standby to watch block locations
HAUtil.setAllowStandbyReads(conf, true); HAUtil.setAllowStandbyReads(conf, true);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology()) .nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3) .numDataNodes(3)
@ -90,8 +89,6 @@ public void testStandbyIsHot() throws Exception {
NameNode nn2 = cluster.getNameNode(1); NameNode nn2 = cluster.getNameNode(1);
nn2.getNamesystem().getEditLogTailer().setRuntime(mockRuntime); nn2.getNamesystem().getEditLogTailer().setRuntime(mockRuntime);
nn2.getNamesystem().getEditLogTailer().setSleepTime(250);
nn2.getNamesystem().getEditLogTailer().interrupt();
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
@ -151,6 +148,7 @@ public void testDatanodeRestarts() throws Exception {
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
// We read from the standby to watch block locations // We read from the standby to watch block locations
HAUtil.setAllowStandbyReads(conf, true); HAUtil.setAllowStandbyReads(conf, true);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology()) .nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(1) .numDataNodes(1)
@ -158,8 +156,6 @@ public void testDatanodeRestarts() throws Exception {
try { try {
NameNode nn0 = cluster.getNameNode(0); NameNode nn0 = cluster.getNameNode(0);
NameNode nn1 = cluster.getNameNode(1); NameNode nn1 = cluster.getNameNode(1);
nn1.getNamesystem().getEditLogTailer().setSleepTime(250);
nn1.getNamesystem().getEditLogTailer().interrupt();
cluster.transitionToActive(0); cluster.transitionToActive(0);