HDFS-5753. Add new NN startup options for downgrade and rollback using upgrade marker.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1559907 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1025121874
commit
d8bc523754
@ -9,3 +9,6 @@ HDFS-5535 subtasks:
|
||||
upgrade. (szetszwo)
|
||||
|
||||
HDFS-5786. Support QUERY and FINALIZE actions of rolling upgrade. (szetszwo)
|
||||
|
||||
HDFS-5753. Add new NN startup options for downgrade and rollback using
|
||||
upgrade marker. (szetszwo)
|
||||
|
@ -24,6 +24,8 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.namenode.MetaRecoveryContext;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/************************************
|
||||
* Some handy internal HDFS constants
|
||||
*
|
||||
@ -43,6 +45,23 @@ static public enum NodeType {
|
||||
JOURNAL_NODE;
|
||||
}
|
||||
|
||||
/** Startup options for rolling upgrade. */
|
||||
public static enum RollingUpgradeStartupOption{
|
||||
ROLLBACK, DOWNGRADE;
|
||||
|
||||
private static final RollingUpgradeStartupOption[] VALUES = values();
|
||||
|
||||
static RollingUpgradeStartupOption fromString(String s) {
|
||||
for(RollingUpgradeStartupOption opt : VALUES) {
|
||||
if (opt.name().equalsIgnoreCase(s)) {
|
||||
return opt;
|
||||
}
|
||||
}
|
||||
throw new IllegalArgumentException("Failed to convert \"" + s
|
||||
+ "\" to " + RollingUpgradeStartupOption.class.getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
/** Startup options */
|
||||
static public enum StartupOption{
|
||||
FORMAT ("-format"),
|
||||
@ -54,6 +73,7 @@ static public enum StartupOption{
|
||||
UPGRADE ("-upgrade"),
|
||||
ROLLBACK("-rollback"),
|
||||
FINALIZE("-finalize"),
|
||||
ROLLINGUPGRADE("-rollingUpgrade"),
|
||||
IMPORT ("-importCheckpoint"),
|
||||
BOOTSTRAPSTANDBY("-bootstrapStandby"),
|
||||
INITIALIZESHAREDEDITS("-initializeSharedEdits"),
|
||||
@ -66,6 +86,9 @@ static public enum StartupOption{
|
||||
// Used only with format and upgrade options
|
||||
private String clusterId = null;
|
||||
|
||||
// Used only by rolling upgrade
|
||||
private RollingUpgradeStartupOption rollingUpgradeStartupOption;
|
||||
|
||||
// Used only with format option
|
||||
private boolean isForceFormat = false;
|
||||
private boolean isInteractiveFormat = true;
|
||||
@ -93,6 +116,16 @@ public void setClusterId(String cid) {
|
||||
public String getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
public void setRollingUpgradeStartupOption(String opt) {
|
||||
Preconditions.checkState(this == ROLLINGUPGRADE);
|
||||
rollingUpgradeStartupOption = RollingUpgradeStartupOption.fromString(opt);
|
||||
}
|
||||
|
||||
public RollingUpgradeStartupOption getRollingUpgradeStartupOption() {
|
||||
Preconditions.checkState(this == ROLLINGUPGRADE);
|
||||
return rollingUpgradeStartupOption;
|
||||
}
|
||||
|
||||
public MetaRecoveryContext createRecoveryContext() {
|
||||
if (!name.equals(RECOVER.name))
|
||||
|
@ -208,7 +208,7 @@ private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data)
|
||||
backupInputStream.setBytes(data, logVersion);
|
||||
|
||||
long numTxnsAdvanced = logLoader.loadEditRecords(
|
||||
backupInputStream, true, lastAppliedTxId + 1, null);
|
||||
backupInputStream, true, lastAppliedTxId + 1, null, null);
|
||||
if (numTxnsAdvanced != numTxns) {
|
||||
throw new IOException("Batch of txns starting at txnid " +
|
||||
firstTxId + " was supposed to contain " + numTxns +
|
||||
@ -266,7 +266,7 @@ private boolean tryConvergeJournalSpool() throws IOException {
|
||||
editStreams.add(s);
|
||||
}
|
||||
}
|
||||
loadEdits(editStreams, namesystem, null);
|
||||
loadEdits(editStreams, namesystem);
|
||||
}
|
||||
|
||||
// now, need to load the in-progress file
|
||||
@ -302,7 +302,7 @@ private boolean tryConvergeJournalSpool() throws IOException {
|
||||
|
||||
FSEditLogLoader loader =
|
||||
new FSEditLogLoader(namesystem, lastAppliedTxId);
|
||||
loader.loadFSEdits(stream, lastAppliedTxId + 1, null);
|
||||
loader.loadFSEdits(stream, lastAppliedTxId + 1);
|
||||
lastAppliedTxId = loader.getLastAppliedTxId();
|
||||
assert lastAppliedTxId == getEditLog().getLastWrittenTxId();
|
||||
} finally {
|
||||
|
@ -307,6 +307,6 @@ static void rollForwardByApplyingLogs(
|
||||
}
|
||||
LOG.info("Checkpointer about to load edits from " +
|
||||
editsStreams.size() + " stream(s).");
|
||||
dstImage.loadEdits(editsStreams, dstNamesystem, null);
|
||||
dstImage.loadEdits(editsStreams, dstNamesystem);
|
||||
}
|
||||
}
|
||||
|
@ -37,8 +37,11 @@
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
|
||||
@ -103,13 +106,18 @@ public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
|
||||
this.lastAppliedTxId = lastAppliedTxId;
|
||||
}
|
||||
|
||||
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
|
||||
throws IOException {
|
||||
return loadFSEdits(edits, expectedStartingTxId, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Load an edit log, and apply the changes to the in-memory structure
|
||||
* This is where we apply edits that we've been writing to disk all
|
||||
* along.
|
||||
*/
|
||||
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
|
||||
MetaRecoveryContext recovery) throws IOException {
|
||||
StartupOption startOpt, MetaRecoveryContext recovery) throws IOException {
|
||||
StartupProgress prog = NameNode.getStartupProgress();
|
||||
Step step = createStartupProgressStep(edits);
|
||||
prog.beginStep(Phase.LOADING_EDITS, step);
|
||||
@ -117,8 +125,8 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
|
||||
try {
|
||||
long startTime = now();
|
||||
FSImage.LOG.info("Start loading edits file " + edits.getName());
|
||||
long numEdits = loadEditRecords(edits, false,
|
||||
expectedStartingTxId, recovery);
|
||||
long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
|
||||
startOpt, recovery);
|
||||
FSImage.LOG.info("Edits file " + edits.getName()
|
||||
+ " of size " + edits.length() + " edits # " + numEdits
|
||||
+ " loaded in " + (now()-startTime)/1000 + " seconds");
|
||||
@ -131,8 +139,8 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
|
||||
}
|
||||
|
||||
long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
|
||||
long expectedStartingTxId, MetaRecoveryContext recovery)
|
||||
throws IOException {
|
||||
long expectedStartingTxId, StartupOption startOpt,
|
||||
MetaRecoveryContext recovery) throws IOException {
|
||||
FSDirectory fsDir = fsNamesys.dir;
|
||||
|
||||
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
|
||||
@ -204,7 +212,8 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
|
||||
}
|
||||
}
|
||||
try {
|
||||
long inodeId = applyEditLogOp(op, fsDir, in.getVersion(), lastInodeId);
|
||||
long inodeId = applyEditLogOp(op, fsDir, startOpt,
|
||||
in.getVersion(), lastInodeId);
|
||||
if (lastInodeId < inodeId) {
|
||||
lastInodeId = inodeId;
|
||||
}
|
||||
@ -212,6 +221,10 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
|
||||
throw e;
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Encountered exception on operation " + op, e);
|
||||
if (recovery == null) {
|
||||
throw e instanceof IOException? (IOException)e: new IOException(e);
|
||||
}
|
||||
|
||||
MetaRecoveryContext.editLogLoaderPrompt("Failed to " +
|
||||
"apply edit log operation " + op + ": error " +
|
||||
e.getMessage(), recovery, "applying edits");
|
||||
@ -288,7 +301,7 @@ private long getAndUpdateLastInodeId(long inodeIdFromOp, int logVersion,
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
||||
int logVersion, long lastInodeId) throws IOException {
|
||||
StartupOption startOpt, int logVersion, long lastInodeId) throws IOException {
|
||||
long inodeId = INodeId.GRANDFATHER_INODE_ID;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("replaying edit log: " + op);
|
||||
@ -656,7 +669,18 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
||||
break;
|
||||
}
|
||||
case OP_UPGRADE_MARKER: {
|
||||
throw new UpgradeMarkerException();
|
||||
if (startOpt == StartupOption.ROLLINGUPGRADE) {
|
||||
if (startOpt.getRollingUpgradeStartupOption()
|
||||
== RollingUpgradeStartupOption.ROLLBACK) {
|
||||
throw new UpgradeMarkerException();
|
||||
} else if (startOpt.getRollingUpgradeStartupOption()
|
||||
== RollingUpgradeStartupOption.DOWNGRADE) {
|
||||
//ignore upgrade marker
|
||||
break;
|
||||
}
|
||||
}
|
||||
throw new RollingUpgradeException(
|
||||
"Unexpected upgrade marker in edit log: op=" + op);
|
||||
}
|
||||
case OP_ADD_CACHE_DIRECTIVE: {
|
||||
AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op;
|
||||
|
@ -259,7 +259,7 @@ boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
|
||||
// just load the image
|
||||
}
|
||||
|
||||
return loadFSImage(target, recovery);
|
||||
return loadFSImage(target, startOpt, recovery);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -327,7 +327,7 @@ private void doUpgrade(FSNamesystem target) throws IOException {
|
||||
}
|
||||
|
||||
// load the latest image
|
||||
this.loadFSImage(target, null);
|
||||
this.loadFSImage(target, null, null);
|
||||
|
||||
// Do upgrade for each directory
|
||||
long oldCTime = storage.getCTime();
|
||||
@ -582,7 +582,8 @@ void reloadFromImageFile(File file, FSNamesystem target) throws IOException {
|
||||
* @return whether the image should be saved
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
|
||||
private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,
|
||||
MetaRecoveryContext recovery)
|
||||
throws IOException {
|
||||
FSImageStorageInspector inspector = storage.readAndInspectDirs();
|
||||
FSImageFile imageFile = null;
|
||||
@ -646,7 +647,7 @@ boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
|
||||
throw new IOException("Failed to load an FSImage file!");
|
||||
}
|
||||
prog.endPhase(Phase.LOADING_FSIMAGE);
|
||||
long txnsAdvanced = loadEdits(editStreams, target, recovery);
|
||||
long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
|
||||
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
|
||||
txnsAdvanced);
|
||||
editLog.setNextTxId(lastAppliedTxId + 1);
|
||||
@ -718,7 +719,13 @@ private boolean needsResaveBasedOnStaleCheckpoint(
|
||||
* Load the specified list of edit files into the image.
|
||||
*/
|
||||
public long loadEdits(Iterable<EditLogInputStream> editStreams,
|
||||
FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
|
||||
FSNamesystem target) throws IOException {
|
||||
return loadEdits(editStreams, target, null, null);
|
||||
}
|
||||
|
||||
private long loadEdits(Iterable<EditLogInputStream> editStreams,
|
||||
FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery)
|
||||
throws IOException {
|
||||
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
|
||||
StartupProgress prog = NameNode.getStartupProgress();
|
||||
prog.beginPhase(Phase.LOADING_EDITS);
|
||||
@ -732,7 +739,7 @@ public long loadEdits(Iterable<EditLogInputStream> editStreams,
|
||||
LOG.info("Reading " + editIn + " expecting start txid #" +
|
||||
(lastAppliedTxId + 1));
|
||||
try {
|
||||
loader.loadFSEdits(editIn, lastAppliedTxId + 1, recovery);
|
||||
loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery);
|
||||
} finally {
|
||||
// Update lastAppliedTxId even in case of error, since some ops may
|
||||
// have been successfully applied before the error.
|
||||
|
@ -1103,6 +1103,10 @@ private static StartupOption parseArguments(String args[]) {
|
||||
i += 2;
|
||||
startOpt.setClusterId(args[i]);
|
||||
}
|
||||
} else if (StartupOption.ROLLINGUPGRADE.getName().equalsIgnoreCase(cmd)) {
|
||||
startOpt = StartupOption.ROLLINGUPGRADE;
|
||||
++i;
|
||||
startOpt.setRollingUpgradeStartupOption(args[i]);
|
||||
} else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) {
|
||||
startOpt = StartupOption.ROLLBACK;
|
||||
} else if (StartupOption.FINALIZE.getName().equalsIgnoreCase(cmd)) {
|
||||
|
@ -224,7 +224,7 @@ void doTailEdits() throws IOException, InterruptedException {
|
||||
// disk are ignored.
|
||||
long editsLoaded = 0;
|
||||
try {
|
||||
editsLoaded = image.loadEdits(streams, namesystem, null);
|
||||
editsLoaded = image.loadEdits(streams, namesystem);
|
||||
} catch (EditLogInputException elie) {
|
||||
editsLoaded = elie.getNumEditsLoaded();
|
||||
throw elie;
|
||||
|
@ -32,8 +32,8 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HOSTS;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
|
||||
@ -100,7 +100,6 @@
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
@ -1521,6 +1520,14 @@ public synchronized void restartNameNode() throws IOException {
|
||||
restartNameNode(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Restart the namenode.
|
||||
*/
|
||||
public synchronized void restartNameNode(String... args) throws IOException {
|
||||
checkSingleNameNode();
|
||||
restartNameNode(0, true, args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Restart the namenode. Optionally wait for the cluster to become active.
|
||||
*/
|
||||
@ -1541,13 +1548,13 @@ public synchronized void restartNameNode(int nnIndex) throws IOException {
|
||||
* Restart the namenode at a given index. Optionally wait for the cluster
|
||||
* to become active.
|
||||
*/
|
||||
public synchronized void restartNameNode(int nnIndex, boolean waitActive)
|
||||
throws IOException {
|
||||
public synchronized void restartNameNode(int nnIndex, boolean waitActive,
|
||||
String... args) throws IOException {
|
||||
String nameserviceId = nameNodes[nnIndex].nameserviceId;
|
||||
String nnId = nameNodes[nnIndex].nnId;
|
||||
Configuration conf = nameNodes[nnIndex].conf;
|
||||
shutdownNameNode(nnIndex);
|
||||
NameNode nn = NameNode.createNameNode(new String[] {}, conf);
|
||||
NameNode nn = NameNode.createNameNode(args, conf);
|
||||
nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, conf);
|
||||
if (waitActive) {
|
||||
waitClusterUp();
|
||||
|
@ -218,7 +218,7 @@ public void testPreTxidEditLogWithEdits() throws Exception {
|
||||
|
||||
private long testLoad(byte[] data, FSNamesystem namesys) throws IOException {
|
||||
FSEditLogLoader loader = new FSEditLogLoader(namesys, 0);
|
||||
return loader.loadFSEdits(new EditLogByteInputStream(data), 1, null);
|
||||
return loader.loadFSEdits(new EditLogByteInputStream(data), 1);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -369,7 +369,7 @@ private void testEditLog(int initialSize) throws IOException {
|
||||
|
||||
System.out.println("Verifying file: " + editFile);
|
||||
long numEdits = loader.loadFSEdits(
|
||||
new EditLogFileInputStream(editFile), 3, null);
|
||||
new EditLogFileInputStream(editFile), 3);
|
||||
int numLeases = namesystem.leaseManager.countLease();
|
||||
System.out.println("Number of outstanding leases " + numLeases);
|
||||
assertEquals(0, numLeases);
|
||||
|
@ -236,8 +236,8 @@ private long verifyEditLogs(FSNamesystem namesystem, FSImage fsimage,
|
||||
|
||||
System.out.println("Verifying file: " + editFile);
|
||||
FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId);
|
||||
long numEditsThisLog = loader.loadFSEdits(new EditLogFileInputStream(editFile),
|
||||
startTxId, null);
|
||||
long numEditsThisLog = loader.loadFSEdits(
|
||||
new EditLogFileInputStream(editFile), startTxId);
|
||||
|
||||
System.out.println("Number of edits: " + numEditsThisLog);
|
||||
assertTrue(numEdits == -1 || numEditsThisLog == numEdits);
|
||||
|
@ -19,11 +19,14 @@
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -32,6 +35,8 @@
|
||||
* This class tests the edit log upgrade marker.
|
||||
*/
|
||||
public class TestEditLogUpgradeMarker {
|
||||
private static final Log LOG = LogFactory.getLog(TestEditLogUpgradeMarker.class);
|
||||
|
||||
/**
|
||||
* Test edit log upgrade marker.
|
||||
*/
|
||||
@ -61,7 +66,14 @@ public void testUpgradeMarker() throws IOException {
|
||||
Assert.assertTrue(dfs.exists(bar));
|
||||
}
|
||||
|
||||
cluster.restartNameNode();
|
||||
try {
|
||||
cluster.restartNameNode();
|
||||
Assert.fail();
|
||||
} catch(RollingUpgradeException e) {
|
||||
LOG.info("The exception is expected: ", e);
|
||||
}
|
||||
|
||||
cluster.restartNameNode("-rollingUpgrade", "rollback");
|
||||
{
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
Assert.assertTrue(dfs.exists(foo));
|
||||
|
@ -158,7 +158,7 @@ public void testEditLog() throws IOException {
|
||||
|
||||
FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0);
|
||||
long numEdits = loader.loadFSEdits(
|
||||
new EditLogFileInputStream(editFile), 1, null);
|
||||
new EditLogFileInputStream(editFile), 1);
|
||||
assertEquals("Verification for " + editFile, expectedTransactions, numEdits);
|
||||
}
|
||||
} finally {
|
||||
|
Loading…
Reference in New Issue
Block a user