HDFS-5966. Fix rollback of rolling upgrade in NameNode HA setup. Contributed by jing9
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1569885 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
99b4caa888
commit
377424e36a
@ -46,3 +46,6 @@ HDFS-5535 subtasks:
|
||||
|
||||
HDFS-5945. Add rolling upgrade information to fsimage; and disallow upgrade
|
||||
and rolling upgrade to be started simultaneously. (szetszwo & jing9)
|
||||
|
||||
HDFS-5966. Fix rollback of rolling upgrade in NameNode HA setup. (jing9
|
||||
via szetszwo)
|
||||
|
@ -87,6 +87,12 @@ static public enum StartupOption{
|
||||
private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile(
|
||||
"(\\w+)\\((\\w+)\\)");
|
||||
|
||||
public static boolean isRollingUpgradeRollback(StartupOption option) {
|
||||
return option == ROLLINGUPGRADE
|
||||
&& option.getRollingUpgradeStartupOption()
|
||||
== RollingUpgradeStartupOption.ROLLBACK;
|
||||
}
|
||||
|
||||
private final String name;
|
||||
|
||||
// Used only with format and upgrade options
|
||||
|
@ -246,8 +246,7 @@ private void doTransition(StorageDirectory sd,
|
||||
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
|
||||
if (startOpt == StartupOption.ROLLBACK) {
|
||||
doRollback(sd, nsInfo); // rollback if applicable
|
||||
} else if (startOpt == StartupOption.ROLLINGUPGRADE &&
|
||||
startOpt.getRollingUpgradeStartupOption() == RollingUpgradeStartupOption.ROLLBACK) {
|
||||
} else if (StartupOption.isRollingUpgradeRollback(startOpt)) {
|
||||
File trashRoot = getTrashRootDir(sd);
|
||||
int filesRestored =
|
||||
trashRoot.exists() ? restoreBlockFilesFromTrash(trashRoot) : 0;
|
||||
|
@ -42,7 +42,6 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
@ -549,9 +548,8 @@ void reloadFromImageFile(File file, FSNamesystem target) throws IOException {
|
||||
private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,
|
||||
MetaRecoveryContext recovery)
|
||||
throws IOException {
|
||||
final boolean rollingRollback = startOpt == StartupOption.ROLLINGUPGRADE
|
||||
&& startOpt.getRollingUpgradeStartupOption() ==
|
||||
RollingUpgradeStartupOption.ROLLBACK;
|
||||
final boolean rollingRollback = StartupOption
|
||||
.isRollingUpgradeRollback(startOpt);
|
||||
final NameNodeFile nnf = rollingRollback ? NameNodeFile.IMAGE_ROLLBACK
|
||||
: NameNodeFile.IMAGE;
|
||||
final FSImageStorageInspector inspector = storage.readAndInspectDirs(nnf);
|
||||
@ -647,9 +645,17 @@ private void rollingRollback(long discardSegmentTxId, long ckptId)
|
||||
// discard discard unnecessary editlog segments starting from the given id
|
||||
this.editLog.discardSegments(discardSegmentTxId);
|
||||
// rename the special checkpoint
|
||||
renameCheckpoint(ckptId, NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE);
|
||||
renameCheckpoint(ckptId, NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE,
|
||||
true);
|
||||
// purge all the checkpoints after the marker
|
||||
archivalManager.purgeCheckpoinsAfter(NameNodeFile.IMAGE, ckptId);
|
||||
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||
if (HAUtil.isHAEnabled(conf, nameserviceId)) {
|
||||
// close the editlog since it is currently open for write
|
||||
this.editLog.close();
|
||||
// reopen the editlog for read
|
||||
this.editLog.initSharedJournalsForRead();
|
||||
}
|
||||
}
|
||||
|
||||
void loadFSImageFile(FSNamesystem target, MetaRecoveryContext recovery,
|
||||
@ -689,18 +695,21 @@ public void initEditLog(StartupOption startOpt) throws IOException {
|
||||
// If this NN is not HA
|
||||
editLog.initJournalsForWrite();
|
||||
editLog.recoverUnclosedStreams();
|
||||
} else if (HAUtil.isHAEnabled(conf, nameserviceId) &&
|
||||
startOpt == StartupOption.UPGRADE) {
|
||||
// This NN is HA, but we're doing an upgrade so init the edit log for
|
||||
// write.
|
||||
} else if (HAUtil.isHAEnabled(conf, nameserviceId)
|
||||
&& (startOpt == StartupOption.UPGRADE || StartupOption
|
||||
.isRollingUpgradeRollback(startOpt))) {
|
||||
// This NN is HA, but we're doing an upgrade or a rollback of rolling
|
||||
// upgrade so init the edit log for write.
|
||||
editLog.initJournalsForWrite();
|
||||
long sharedLogCTime = editLog.getSharedLogCTime();
|
||||
if (this.storage.getCTime() < sharedLogCTime) {
|
||||
throw new IOException("It looks like the shared log is already " +
|
||||
"being upgraded but this NN has not been upgraded yet. You " +
|
||||
"should restart this NameNode with the '" +
|
||||
StartupOption.BOOTSTRAPSTANDBY.getName() + "' option to bring " +
|
||||
"this NN in sync with the other.");
|
||||
if (startOpt == StartupOption.UPGRADE) {
|
||||
long sharedLogCTime = editLog.getSharedLogCTime();
|
||||
if (this.storage.getCTime() < sharedLogCTime) {
|
||||
throw new IOException("It looks like the shared log is already " +
|
||||
"being upgraded but this NN has not been upgraded yet. You " +
|
||||
"should restart this NameNode with the '" +
|
||||
StartupOption.BOOTSTRAPSTANDBY.getName() + "' option to bring " +
|
||||
"this NN in sync with the other.");
|
||||
}
|
||||
}
|
||||
editLog.recoverUnclosedStreams();
|
||||
} else {
|
||||
@ -759,9 +768,8 @@ private long loadEdits(Iterable<EditLogInputStream> editStreams,
|
||||
// have been successfully applied before the error.
|
||||
lastAppliedTxId = loader.getLastAppliedTxId();
|
||||
}
|
||||
boolean rollingRollback = startOpt == StartupOption.ROLLINGUPGRADE &&
|
||||
startOpt.getRollingUpgradeStartupOption() ==
|
||||
RollingUpgradeStartupOption.ROLLBACK;
|
||||
boolean rollingRollback = StartupOption
|
||||
.isRollingUpgradeRollback(startOpt);
|
||||
// If we are in recovery mode, we may have skipped over some txids.
|
||||
if (editIn.getLastTxId() != HdfsConstants.INVALID_TXID
|
||||
&& !rollingRollback) {
|
||||
@ -1029,7 +1037,7 @@ private synchronized void saveFSImageInAllDirs(FSNamesystem source,
|
||||
assert false : "should have thrown above!";
|
||||
}
|
||||
|
||||
renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf);
|
||||
renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf, false);
|
||||
|
||||
// Since we now have a new checkpoint, we can clean up some
|
||||
// old edit logs and checkpoints.
|
||||
@ -1070,12 +1078,12 @@ void purgeCheckpoints(NameNodeFile nnf) {
|
||||
* Renames new image
|
||||
*/
|
||||
private void renameCheckpoint(long txid, NameNodeFile fromNnf,
|
||||
NameNodeFile toNnf) throws IOException {
|
||||
NameNodeFile toNnf, boolean renameMD5) throws IOException {
|
||||
ArrayList<StorageDirectory> al = null;
|
||||
|
||||
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) {
|
||||
try {
|
||||
renameImageFileInDir(sd, fromNnf, toNnf, txid);
|
||||
renameImageFileInDir(sd, fromNnf, toNnf, txid, renameMD5);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Unable to rename checkpoint in " + sd, ioe);
|
||||
if (al == null) {
|
||||
@ -1104,8 +1112,8 @@ private void deleteCancelledCheckpoint(long txid) throws IOException {
|
||||
storage.reportErrorsOnDirectories(al);
|
||||
}
|
||||
|
||||
private void renameImageFileInDir(StorageDirectory sd,
|
||||
NameNodeFile fromNnf, NameNodeFile toNnf, long txid) throws IOException {
|
||||
private void renameImageFileInDir(StorageDirectory sd, NameNodeFile fromNnf,
|
||||
NameNodeFile toNnf, long txid, boolean renameMD5) throws IOException {
|
||||
final File fromFile = NNStorage.getStorageFile(sd, fromNnf, txid);
|
||||
final File toFile = NNStorage.getStorageFile(sd, toNnf, txid);
|
||||
// renameTo fails on Windows if the destination file
|
||||
@ -1119,7 +1127,10 @@ private void renameImageFileInDir(StorageDirectory sd,
|
||||
throw new IOException("renaming " + fromFile.getAbsolutePath() + " to " +
|
||||
toFile.getAbsolutePath() + " FAILED");
|
||||
}
|
||||
}
|
||||
}
|
||||
if (renameMD5) {
|
||||
MD5FileUtils.renameMD5File(fromFile, toFile);
|
||||
}
|
||||
}
|
||||
|
||||
CheckpointSignature rollEditLog() throws IOException {
|
||||
@ -1218,7 +1229,7 @@ public synchronized void saveDigestAndRenameCheckpointImage(
|
||||
CheckpointFaultInjector.getInstance().afterMD5Rename();
|
||||
|
||||
// Rename image from tmp file
|
||||
renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, NameNodeFile.IMAGE);
|
||||
renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, NameNodeFile.IMAGE, false);
|
||||
// So long as this is the newest image available,
|
||||
// advertise it as such to other checkpointers
|
||||
// from now on
|
||||
|
@ -135,15 +135,44 @@ public static MD5Hash computeMd5ForFile(File dataFile) throws IOException {
|
||||
*/
|
||||
public static void saveMD5File(File dataFile, MD5Hash digest)
|
||||
throws IOException {
|
||||
final String digestString = StringUtils.byteToHexString(digest.getDigest());
|
||||
saveMD5File(dataFile, digestString);
|
||||
}
|
||||
|
||||
private static void saveMD5File(File dataFile, String digestString)
|
||||
throws IOException {
|
||||
File md5File = getDigestFileForFile(dataFile);
|
||||
String digestString = StringUtils.byteToHexString(
|
||||
digest.getDigest());
|
||||
String md5Line = digestString + " *" + dataFile.getName() + "\n";
|
||||
|
||||
|
||||
AtomicFileOutputStream afos = new AtomicFileOutputStream(md5File);
|
||||
afos.write(md5Line.getBytes(Charsets.UTF_8));
|
||||
afos.close();
|
||||
LOG.debug("Saved MD5 " + digest + " to " + md5File);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Saved MD5 " + digestString + " to " + md5File);
|
||||
}
|
||||
}
|
||||
|
||||
public static void renameMD5File(File oldDataFile, File newDataFile)
|
||||
throws IOException {
|
||||
File fromFile = getDigestFileForFile(oldDataFile);
|
||||
BufferedReader in = null;
|
||||
final String digestString;
|
||||
try {
|
||||
in = new BufferedReader(new InputStreamReader(new FileInputStream(
|
||||
fromFile), Charsets.UTF_8));
|
||||
String line = in.readLine();
|
||||
String[] split = line.split(" \\*");
|
||||
digestString = split[0];
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, in);
|
||||
}
|
||||
|
||||
saveMD5File(newDataFile, digestString);
|
||||
|
||||
if (!fromFile.delete()) {
|
||||
LOG.warn("deleting " + fromFile.getAbsolutePath() + " FAILED");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1930,7 +1930,8 @@ public void triggerHeartbeats()
|
||||
|
||||
/** Wait until the given namenode gets registration from all the datanodes */
|
||||
public void waitActive(int nnIndex) throws IOException {
|
||||
if (nameNodes.length == 0 || nameNodes[nnIndex] == null) {
|
||||
if (nameNodes.length == 0 || nameNodes[nnIndex] == null
|
||||
|| nameNodes[nnIndex].nameNode == null) {
|
||||
return;
|
||||
}
|
||||
InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress();
|
||||
|
@ -374,6 +374,19 @@ public void testUpgrade() throws Exception {
|
||||
log("Normal NameNode upgrade", numDirs);
|
||||
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
|
||||
cluster = createCluster();
|
||||
|
||||
// make sure that rolling upgrade cannot be started
|
||||
try {
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||
dfs.rollingUpgrade(RollingUpgradeAction.START);
|
||||
fail();
|
||||
} catch(RemoteException re) {
|
||||
assertEquals(InconsistentFSStateException.class.getName(),
|
||||
re.getClassName());
|
||||
LOG.info("The exception is expected.", re);
|
||||
}
|
||||
|
||||
checkNameNode(nameNodeDirs, EXPECTED_TXID);
|
||||
TestParallelImageWrite.checkImages(cluster.getNamesystem(), numDirs);
|
||||
cluster.shutdown();
|
||||
|
@ -22,7 +22,10 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
@ -188,9 +191,74 @@ public void testRollbackWithQJM() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO: Test rollback scenarios where StandbyNameNode does checkpoints during
|
||||
* Test rollback scenarios where StandbyNameNode does checkpoints during
|
||||
* rolling upgrade.
|
||||
*/
|
||||
|
||||
@Test
|
||||
public void testRollbackWithHAQJM() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
MiniQJMHACluster cluster = null;
|
||||
final Path foo = new Path("/foo");
|
||||
final Path bar = new Path("/bar");
|
||||
|
||||
try {
|
||||
cluster = new MiniQJMHACluster.Builder(conf).build();
|
||||
MiniDFSCluster dfsCluster = cluster.getDfsCluster();
|
||||
dfsCluster.waitActive();
|
||||
|
||||
// let NN1 do checkpoints as fast as possible
|
||||
dfsCluster.getConfiguration(1).setInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0);
|
||||
dfsCluster.restartNameNode(1);
|
||||
|
||||
dfsCluster.transitionToActive(0);
|
||||
DistributedFileSystem dfs = dfsCluster.getFileSystem(0);
|
||||
dfs.mkdirs(foo);
|
||||
|
||||
// start rolling upgrade
|
||||
RollingUpgradeInfo info = dfs.rollingUpgrade(RollingUpgradeAction.START);
|
||||
Assert.assertTrue(info.isStarted());
|
||||
|
||||
// create new directory
|
||||
dfs.mkdirs(bar);
|
||||
dfs.close();
|
||||
|
||||
// rollback NN0
|
||||
dfsCluster.restartNameNode(0, true, "-rollingUpgrade",
|
||||
"rollback");
|
||||
// shutdown NN1
|
||||
dfsCluster.shutdownNameNode(1);
|
||||
dfsCluster.transitionToActive(0);
|
||||
|
||||
// make sure /foo is still there, but /bar is not
|
||||
dfs = dfsCluster.getFileSystem(0);
|
||||
Assert.assertTrue(dfs.exists(foo));
|
||||
Assert.assertFalse(dfs.exists(bar));
|
||||
|
||||
// check the details of NNStorage
|
||||
NNStorage storage = dfsCluster.getNamesystem(0).getFSImage()
|
||||
.getStorage();
|
||||
// (startSegment, upgrade marker, mkdir, endSegment)
|
||||
checkNNStorage(storage, 3, 7);
|
||||
|
||||
// check storage in JNs
|
||||
for (int i = 0; i < NUM_JOURNAL_NODES; i++) {
|
||||
File dir = cluster.getJournalCluster().getCurrentDir(0,
|
||||
MiniQJMHACluster.NAMESERVICE);
|
||||
// segments:(startSegment, mkdir, endSegment), (startSegment, upgrade
|
||||
// marker, mkdir, endSegment)
|
||||
checkJNStorage(dir, 4, 7);
|
||||
}
|
||||
|
||||
// restart NN0 again to make sure we can start using the new fsimage and
|
||||
// the corresponding md5 checksum
|
||||
dfsCluster.restartNameNode(0);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: rollback could not succeed in all JN
|
||||
}
|
@ -38,7 +38,7 @@ public class MiniQJMHACluster {
|
||||
private MiniJournalCluster journalCluster;
|
||||
private final Configuration conf;
|
||||
|
||||
private static String NAMESERVICE = "ns1";
|
||||
public static String NAMESERVICE = "ns1";
|
||||
private static final String NN1 = "nn1";
|
||||
private static final String NN2 = "nn2";
|
||||
private static final int NN1_IPC_PORT = 10000;
|
||||
|
Loading…
Reference in New Issue
Block a user