HDFS-5835. Add a new option for starting Namenode when rolling upgrade is in progress.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1561943 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-01-28 05:35:47 +00:00
parent a9110e1788
commit 917502ef31
8 changed files with 159 additions and 29 deletions

View File

@ -10,5 +10,8 @@ HDFS-5535 subtasks:
HDFS-5786. Support QUERY and FINALIZE actions of rolling upgrade. (szetszwo) HDFS-5786. Support QUERY and FINALIZE actions of rolling upgrade. (szetszwo)
HDFS-5753. Add new NN startup options for downgrade and rollback using HDFS-5753. Add new Namenode startup options for downgrade and rollback using
upgrade marker. (szetszwo) upgrade marker. (szetszwo)
HDFS-5835. Add a new option for starting Namenode when rolling upgrade is
in progress. (szetszwo)

View File

@ -59,6 +59,24 @@ public long getFinalizeTime() {
return finalizeTime; return finalizeTime;
} }
@Override
public int hashCode() {
//only use lower 32 bits
return (int)startTime ^ (int)finalizeTime;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj == null || !(obj instanceof RollingUpgradeInfo)) {
return false;
}
final RollingUpgradeInfo that = (RollingUpgradeInfo)obj;
return this.startTime == that.startTime
&& this.finalizeTime == that.finalizeTime;
}
@Override @Override
public String toString() { public String toString() {
return " Start Time: " + (startTime == 0? "<NOT STARTED>": timestamp2String(startTime)) return " Start Time: " + (startTime == 0? "<NOT STARTED>": timestamp2String(startTime))

View File

@ -47,7 +47,7 @@ static public enum NodeType {
/** Startup options for rolling upgrade. */ /** Startup options for rolling upgrade. */
public static enum RollingUpgradeStartupOption{ public static enum RollingUpgradeStartupOption{
ROLLBACK, DOWNGRADE; ROLLBACK, DOWNGRADE, STARTED;
private static final RollingUpgradeStartupOption[] VALUES = values(); private static final RollingUpgradeStartupOption[] VALUES = values();

View File

@ -79,6 +79,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpgradeMarkerOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpgradeMarkerOp.UpgradeMarkerException; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpgradeMarkerOp.UpgradeMarkerException;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
@ -677,6 +678,12 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
== RollingUpgradeStartupOption.DOWNGRADE) { == RollingUpgradeStartupOption.DOWNGRADE) {
//ignore upgrade marker //ignore upgrade marker
break; break;
} else if (startOpt.getRollingUpgradeStartupOption()
== RollingUpgradeStartupOption.STARTED) {
//rolling upgrade is already started, set info
final UpgradeMarkerOp upgradeOp = (UpgradeMarkerOp)op;
fsNamesys.setRollingUpgradeInfo(upgradeOp.getStartTime());
break;
} }
} }
throw new RollingUpgradeException( throw new RollingUpgradeException(

View File

@ -3398,6 +3398,10 @@ static UpgradeMarkerOp getInstance(OpInstanceCache cache) {
return (UpgradeMarkerOp) cache.get(OP_UPGRADE_MARKER); return (UpgradeMarkerOp) cache.get(OP_UPGRADE_MARKER);
} }
long getStartTime() {
return startTime;
}
void setStartTime(long startTime) { void setStartTime(long startTime) {
this.startTime = startTime; this.startTime = startTime;
} }

View File

@ -7110,7 +7110,7 @@ RollingUpgradeInfo startRollingUpgrade() throws IOException {
final CheckpointSignature cs = getFSImage().rollEditLog(); final CheckpointSignature cs = getFSImage().rollEditLog();
LOG.info("Successfully rolled edit log for preparing rolling upgrade." LOG.info("Successfully rolled edit log for preparing rolling upgrade."
+ " Checkpoint signature: " + cs); + " Checkpoint signature: " + cs);
rollingUpgradeInfo = new RollingUpgradeInfo(now()); setRollingUpgradeInfo(now());
getEditLog().logUpgradeMarker(rollingUpgradeInfo.getStartTime()); getEditLog().logUpgradeMarker(rollingUpgradeInfo.getStartTime());
} finally { } finally {
writeUnlock(); writeUnlock();
@ -7123,6 +7123,10 @@ RollingUpgradeInfo startRollingUpgrade() throws IOException {
return rollingUpgradeInfo; return rollingUpgradeInfo;
} }
void setRollingUpgradeInfo(long startTime) {
rollingUpgradeInfo = new RollingUpgradeInfo(startTime);;
}
RollingUpgradeInfo finalizeRollingUpgrade() throws IOException { RollingUpgradeInfo finalizeRollingUpgrade() throws IOException {
checkSuperuserPrivilege(); checkSuperuserPrivilege();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);

View File

@ -914,6 +914,10 @@ private static void initNameNodeAddress(Configuration conf,
} }
private static String[] createArgs(StartupOption operation) { private static String[] createArgs(StartupOption operation) {
if (operation == StartupOption.ROLLINGUPGRADE) {
return new String[]{operation.getName(),
operation.getRollingUpgradeStartupOption().name()};
}
String[] args = (operation == null || String[] args = (operation == null ||
operation == StartupOption.FORMAT || operation == StartupOption.FORMAT ||
operation == StartupOption.REGULAR) ? operation == StartupOption.REGULAR) ?

View File

@ -17,8 +17,18 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.io.File;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -28,6 +38,12 @@
* This class tests rolling upgrade. * This class tests rolling upgrade.
*/ */
public class TestRollingUpgrade { public class TestRollingUpgrade {
private static final Log LOG = LogFactory.getLog(TestRollingUpgrade.class);
private void runCmd(DFSAdmin dfsadmin, String... args) throws Exception {
Assert.assertEquals(0, dfsadmin.run(args));
}
/** /**
* Test DFSAdmin Upgrade Command. * Test DFSAdmin Upgrade Command.
*/ */
@ -55,39 +71,23 @@ public void testDFSAdminRollingUpgradeCommands() throws Exception {
Assert.assertTrue(dfsadmin.run(args) != 0); Assert.assertTrue(dfsadmin.run(args) != 0);
} }
{
//query rolling upgrade //query rolling upgrade
final String[] args = {"-rollingUpgrade"}; runCmd(dfsadmin, "-rollingUpgrade");
Assert.assertEquals(0, dfsadmin.run(args));
}
{
//start rolling upgrade //start rolling upgrade
final String[] args = {"-rollingUpgrade", "start"}; runCmd(dfsadmin, "-rollingUpgrade", "start");
Assert.assertEquals(0, dfsadmin.run(args));
}
{
//query rolling upgrade //query rolling upgrade
final String[] args = {"-rollingUpgrade", "query"}; runCmd(dfsadmin, "-rollingUpgrade", "query");
Assert.assertEquals(0, dfsadmin.run(args));
}
dfs.mkdirs(bar); dfs.mkdirs(bar);
{
//finalize rolling upgrade //finalize rolling upgrade
final String[] args = {"-rollingUpgrade", "finalize"}; runCmd(dfsadmin, "-rollingUpgrade", "finalize");
Assert.assertEquals(0, dfsadmin.run(args));
}
dfs.mkdirs(baz); dfs.mkdirs(baz);
{ runCmd(dfsadmin, "-rollingUpgrade");
//query rolling upgrade
final String[] args = {"-rollingUpgrade"};
Assert.assertEquals(0, dfsadmin.run(args));
}
Assert.assertTrue(dfs.exists(foo)); Assert.assertTrue(dfs.exists(foo));
Assert.assertTrue(dfs.exists(bar)); Assert.assertTrue(dfs.exists(bar));
@ -105,4 +105,94 @@ public void testDFSAdminRollingUpgradeCommands() throws Exception {
if(cluster != null) cluster.shutdown(); if(cluster != null) cluster.shutdown();
} }
} }
private static Configuration setConf(Configuration conf, File dir,
MiniJournalCluster mjc) {
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, dir.getAbsolutePath());
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
mjc.getQuorumJournalURI("myjournal").toString());
return conf;
}
@Test (timeout = 30000)
public void testRollingUpgradeWithQJM() throws Exception {
String nnDirPrefix = MiniDFSCluster.getBaseDirectory() + "/nn/";
final File nn1Dir = new File(nnDirPrefix + "image1");
final File nn2Dir = new File(nnDirPrefix + "image2");
final Configuration conf = new HdfsConfiguration();
final MiniJournalCluster mjc = new MiniJournalCluster.Builder(conf).build();
setConf(conf, nn1Dir, mjc);
{
// Start the cluster once to generate the dfs dirs
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.manageNameDfsDirs(false)
.checkExitOnShutdown(false)
.build();
// Shutdown the cluster before making a copy of the namenode dir to release
// all file locks, otherwise, the copy will fail on some platforms.
cluster.shutdown();
}
MiniDFSCluster cluster2 = null;
try {
// Start a second NN pointed to the same quorum.
// We need to copy the image dir from the first NN -- or else
// the new NN will just be rejected because of Namespace mismatch.
FileUtil.fullyDelete(nn2Dir);
FileUtil.copy(nn1Dir, FileSystem.getLocal(conf).getRaw(),
new Path(nn2Dir.getAbsolutePath()), false, conf);
// Start the cluster again
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.format(false)
.manageNameDfsDirs(false)
.checkExitOnShutdown(false)
.build();
final Path foo = new Path("/foo");
final Path bar = new Path("/bar");
final RollingUpgradeInfo info1;
{
final DistributedFileSystem dfs = cluster.getFileSystem();
dfs.mkdirs(foo);
//start rolling upgrade
info1 = dfs.rollingUpgrade(RollingUpgradeAction.START);
LOG.info("start " + info1);
//query rolling upgrade
Assert.assertEquals(info1, dfs.rollingUpgrade(RollingUpgradeAction.QUERY));
dfs.mkdirs(bar);
}
// cluster2 takes over QJM
final Configuration conf2 = setConf(new Configuration(), nn2Dir, mjc);
//set startup option to downgrade for ignoring upgrade marker in editlog
StartupOption.ROLLINGUPGRADE.setRollingUpgradeStartupOption("started");
cluster2 = new MiniDFSCluster.Builder(conf2)
.numDataNodes(0)
.format(false)
.manageNameDfsDirs(false)
.startupOption(StartupOption.ROLLINGUPGRADE)
.build();
final DistributedFileSystem dfs2 = cluster2.getFileSystem();
// Check that cluster2 sees the edits made on cluster1
Assert.assertTrue(dfs2.exists(foo));
Assert.assertTrue(dfs2.exists(bar));
//query rolling upgrade in cluster2
Assert.assertEquals(info1, dfs2.rollingUpgrade(RollingUpgradeAction.QUERY));
LOG.info("finalize: " + dfs2.rollingUpgrade(RollingUpgradeAction.FINALIZE));
} finally {
if (cluster2 != null) cluster2.shutdown();
}
}
} }