HDFS-5889. When starting rolling upgrade, create a fs image for rollback so that the standby namenode can create checkpoints during upgrade. Contributed by szetszwo & jing9

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1567861 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-02-13 08:04:58 +00:00
parent 300e4906af
commit 4f9bbaa301
16 changed files with 179 additions and 95 deletions

View File

@ -28,3 +28,7 @@ HDFS-5535 subtasks:
HDFS-5874. Should not compare DataNode current layout version with that of
NameNode in DataStrorage. (brandonli)
HDFS-5889. When starting rolling upgrade, create a fs image for rollback
so that the standby namenode can create checkpoints during upgrade.
(szetszwo & jing9)

View File

@ -81,7 +81,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpgradeMarkerOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -1017,9 +1017,15 @@ void logRemoveCachePool(String poolName, boolean toLogRpcIds) {
logEdit(op);
}
void logUpgradeMarker(long startTime) {
UpgradeMarkerOp op = UpgradeMarkerOp.getInstance(cache.get());
op.setStartTime(startTime);
void logStartRollingUpgrade(long startTime) {
RollingUpgradeOp op = RollingUpgradeOp.getStartInstance(cache.get());
op.setTime(startTime);
logEdit(op);
}
void logFinalizeRollingUpgrade(long finalizeTime) {
RollingUpgradeOp op = RollingUpgradeOp.getFinalizeInstance(cache.get());
op.setTime(finalizeTime);
logEdit(op);
}

View File

@ -68,6 +68,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp;
@ -79,10 +80,9 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpgradeMarkerOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpgradeMarkerOp.UpgradeMarkerException;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
@ -225,7 +225,7 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
if (lastInodeId < inodeId) {
lastInodeId = inodeId;
}
} catch (UpgradeMarkerException e) {
} catch (RollingUpgradeOp.RollbackException e) {
throw e;
} catch (Throwable e) {
LOG.error("Encountered exception on operation " + op, e);
@ -259,8 +259,8 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
}
numEdits++;
totalEdits++;
} catch (UpgradeMarkerException e) {
LOG.info("Stopped at upgrade marker");
} catch (RollingUpgradeOp.RollbackException e) {
LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback.");
break;
} catch (MetaRecoveryContext.RequestStopException e) {
MetaRecoveryContext.LOG.warn("Stopped reading edit log at " +
@ -715,29 +715,45 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
fsNamesys.setLastAllocatedBlockId(allocateBlockIdOp.blockId);
break;
}
case OP_UPGRADE_MARKER: {
case OP_ROLLING_UPGRADE_START: {
boolean started = false;
if (startOpt == StartupOption.ROLLINGUPGRADE) {
final RollingUpgradeStartupOption rollingUpgradeOpt
= startOpt.getRollingUpgradeStartupOption();
if (rollingUpgradeOpt == RollingUpgradeStartupOption.ROLLBACK) {
throw new UpgradeMarkerException();
throw new RollingUpgradeOp.RollbackException();
} else if (rollingUpgradeOpt == RollingUpgradeStartupOption.DOWNGRADE) {
//ignore upgrade marker
break;
} else if (rollingUpgradeOpt == RollingUpgradeStartupOption.STARTED) {
if (totalEdits > 1) {
// save namespace if this is not the second edit transaction
// (the first must be OP_START_LOG_SEGMENT)
fsNamesys.getFSImage().saveNamespace(fsNamesys);
}
//rolling upgrade is already started, set info
final UpgradeMarkerOp upgradeOp = (UpgradeMarkerOp)op;
fsNamesys.setRollingUpgradeInfo(upgradeOp.getStartTime());
break;
started = true;
}
}
if (started || fsNamesys.isInStandbyState()) {
if (totalEdits > 1) {
// save namespace if this is not the second edit transaction
// (the first must be OP_START_LOG_SEGMENT)
fsNamesys.getFSImage().saveNamespace(fsNamesys,
NameNodeFile.IMAGE_ROLLBACK, null);
}
//rolling upgrade is already started, set info
final RollingUpgradeOp upgradeOp = (RollingUpgradeOp)op;
fsNamesys.setRollingUpgradeInfo(upgradeOp.getTime());
break;
}
throw new RollingUpgradeException(
"Unexpected upgrade marker in edit log: op=" + op);
"Unexpected OP_ROLLING_UPGRADE_START in edit log: op=" + op);
}
case OP_ROLLING_UPGRADE_FINALIZE: {
if (!fsNamesys.isRollingUpgrade()) {
throw new RollingUpgradeException(
"Unexpected OP_ROLLING_UPGRADE_FINALIZE "
+ " since there is no rolling upgrade in progress.");
}
fsNamesys.getFSImage().purgeCheckpoints(NameNodeFile.IMAGE_ROLLBACK);
break;
}
case OP_ADD_CACHE_DIRECTIVE: {
AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op;

View File

@ -44,6 +44,8 @@
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ROLLING_UPGRADE_FINALIZE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ROLLING_UPGRADE_START;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V1;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V2;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_NS_QUOTA;
@ -56,7 +58,6 @@
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPGRADE_MARKER;
import java.io.DataInput;
import java.io.DataInputStream;
@ -169,7 +170,10 @@ public OpInstanceCache() {
inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
inst.put(OP_ADD_BLOCK, new AddBlockOp());
inst.put(OP_UPGRADE_MARKER, new UpgradeMarkerOp());
inst.put(OP_ROLLING_UPGRADE_START, new RollingUpgradeOp(
OP_ROLLING_UPGRADE_START, "start"));
inst.put(OP_ROLLING_UPGRADE_FINALIZE, new RollingUpgradeOp(
OP_ROLLING_UPGRADE_FINALIZE, "finalize"));
}
public FSEditLogOp get(FSEditLogOpCodes opcode) {
@ -3415,54 +3419,59 @@ public void readFields(DataInput in) throws IOException {
/**
* Operation corresponding to upgrade
*/
static class UpgradeMarkerOp extends FSEditLogOp { // @Idempotent
private long startTime;
static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent
private final String name;
private long time;
public UpgradeMarkerOp() {
super(OP_UPGRADE_MARKER);
public RollingUpgradeOp(FSEditLogOpCodes code, String name) {
super(code);
this.name = name.toUpperCase();
}
static UpgradeMarkerOp getInstance(OpInstanceCache cache) {
return (UpgradeMarkerOp) cache.get(OP_UPGRADE_MARKER);
static RollingUpgradeOp getStartInstance(OpInstanceCache cache) {
return (RollingUpgradeOp) cache.get(OP_ROLLING_UPGRADE_START);
}
long getStartTime() {
return startTime;
static RollingUpgradeOp getFinalizeInstance(OpInstanceCache cache) {
return (RollingUpgradeOp) cache.get(OP_ROLLING_UPGRADE_FINALIZE);
}
void setStartTime(long startTime) {
this.startTime = startTime;
long getTime() {
return time;
}
void setTime(long time) {
this.time = time;
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
startTime = in.readLong();
time = in.readLong();
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(startTime, out);
FSImageSerialization.writeLong(time, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "STARTTIME",
Long.valueOf(startTime).toString());
XMLUtils.addSaxString(contentHandler, name + "TIME",
Long.valueOf(time).toString());
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.startTime = Long.valueOf(st.getValue("STARTTIME"));
this.time = Long.valueOf(st.getValue(name + "TIME"));
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("UpgradeMarkerOp");
return builder.toString();
return new StringBuilder().append("RollingUpgradeOp [").append(name)
.append(", time=").append(time).append("]").toString();
}
static class UpgradeMarkerException extends IOException {
static class RollbackException extends IOException {
private static final long serialVersionUID = 1L;
}
}

View File

@ -67,7 +67,8 @@ public enum FSEditLogOpCodes {
OP_MODIFY_CACHE_POOL ((byte) 37),
OP_REMOVE_CACHE_POOL ((byte) 38),
OP_MODIFY_CACHE_DIRECTIVE ((byte) 39),
OP_UPGRADE_MARKER ((byte) 40),
OP_ROLLING_UPGRADE_START ((byte) 40),
OP_ROLLING_UPGRADE_FINALIZE ((byte) 41),
// Note that the current range of the valid OP code is 0~127
OP_INVALID ((byte) -1);

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
@ -542,9 +543,16 @@ void reloadFromImageFile(File file, FSNamesystem target) throws IOException {
private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,
MetaRecoveryContext recovery)
throws IOException {
FSImageStorageInspector inspector = storage.readAndInspectDirs();
FSImageFile imageFile = null;
final NameNodeFile nnf;
if (startOpt == StartupOption.ROLLINGUPGRADE
&& startOpt.getRollingUpgradeStartupOption()
== RollingUpgradeStartupOption.ROLLBACK) {
nnf = NameNodeFile.IMAGE_ROLLBACK;
} else {
nnf = NameNodeFile.IMAGE;
}
final FSImageStorageInspector inspector = storage.readAndInspectDirs(nnf);
isUpgradeFinalized = inspector.isUpgradeFinalized();
List<FSImageFile> imageFiles = inspector.getLatestImages();
@ -587,6 +595,7 @@ LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
LOG.info("No edit log streams selected.");
}
FSImageFile imageFile = null;
for (int i = 0; i < imageFiles.size(); i++) {
try {
imageFile = imageFiles.get(i);
@ -901,7 +910,7 @@ private void waitForThreads(List<Thread> threads) {
*/
public synchronized void saveNamespace(FSNamesystem source)
throws IOException {
saveNamespace(source, null);
saveNamespace(source, NameNodeFile.IMAGE, null);
}
/**
@ -910,7 +919,7 @@ public synchronized void saveNamespace(FSNamesystem source)
* @param canceler
*/
public synchronized void saveNamespace(FSNamesystem source,
Canceler canceler) throws IOException {
NameNodeFile nnf, Canceler canceler) throws IOException {
assert editLog != null : "editLog must be initialized";
LOG.info("Save namespace ...");
storage.attemptRestoreRemovedStorage();
@ -922,7 +931,7 @@ public synchronized void saveNamespace(FSNamesystem source,
}
long imageTxId = getLastAppliedOrWrittenTxId();
try {
saveFSImageInAllDirs(source, imageTxId, canceler);
saveFSImageInAllDirs(source, nnf, imageTxId, canceler);
storage.writeAll();
} finally {
if (editLogWasOpen) {
@ -941,12 +950,11 @@ public synchronized void saveNamespace(FSNamesystem source,
*/
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
throws IOException {
saveFSImageInAllDirs(source, txid, null);
saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null);
}
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid,
Canceler canceler)
throws IOException {
private synchronized void saveFSImageInAllDirs(FSNamesystem source,
NameNodeFile nnf, long txid, Canceler canceler) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAVING_CHECKPOINT);
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
@ -983,11 +991,11 @@ protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid,
assert false : "should have thrown above!";
}
renameCheckpoint(txid);
renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf);
// Since we now have a new checkpoint, we can clean up some
// old edit logs and checkpoints.
purgeOldStorage();
purgeOldStorage(nnf);
} finally {
// Notify any threads waiting on the checkpoint to be canceled
// that it is complete.
@ -1001,23 +1009,35 @@ protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid,
* Purge any files in the storage directories that are no longer
* necessary.
*/
public void purgeOldStorage() {
void purgeOldStorage(NameNodeFile nnf) {
try {
archivalManager.purgeOldStorage();
archivalManager.purgeOldStorage(nnf);
} catch (Exception e) {
LOG.warn("Unable to purge old storage", e);
LOG.warn("Unable to purge old storage " + nnf.getName(), e);
}
}
/**
* Purge all the checkpoints with the name style.
*/
void purgeCheckpoints(NameNodeFile nnf) {
try {
archivalManager.purgeCheckpoints(nnf);
} catch (Exception e) {
LOG.warn("Unable to purge checkpoints with name " + nnf.getName(), e);
}
}
/**
* Renames new image
*/
private void renameCheckpoint(long txid) throws IOException {
private void renameCheckpoint(long txid, NameNodeFile fromNnf,
NameNodeFile toNnf) throws IOException {
ArrayList<StorageDirectory> al = null;
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) {
try {
renameCheckpointInDir(sd, txid);
renameImageFileInDir(sd, fromNnf, toNnf, txid);
} catch (IOException ioe) {
LOG.warn("Unable to rename checkpoint in " + sd, ioe);
if (al == null) {
@ -1046,21 +1066,20 @@ private void deleteCancelledCheckpoint(long txid) throws IOException {
storage.reportErrorsOnDirectories(al);
}
private void renameCheckpointInDir(StorageDirectory sd, long txid)
throws IOException {
File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
File curFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
private void renameImageFileInDir(StorageDirectory sd,
NameNodeFile fromNnf, NameNodeFile toNnf, long txid) throws IOException {
final File fromFile = NNStorage.getStorageFile(sd, fromNnf, txid);
final File toFile = NNStorage.getStorageFile(sd, toNnf, txid);
// renameTo fails on Windows if the destination file
// already exists.
if(LOG.isDebugEnabled()) {
LOG.debug("renaming " + ckpt.getAbsolutePath()
+ " to " + curFile.getAbsolutePath());
LOG.debug("renaming " + fromFile.getAbsolutePath()
+ " to " + toFile.getAbsolutePath());
}
if (!ckpt.renameTo(curFile)) {
if (!curFile.delete() || !ckpt.renameTo(curFile)) {
throw new IOException("renaming " + ckpt.getAbsolutePath() + " to " +
curFile.getAbsolutePath() + " FAILED");
if (!fromFile.renameTo(toFile)) {
if (!toFile.delete() || !fromFile.renameTo(toFile)) {
throw new IOException("renaming " + fromFile.getAbsolutePath() + " to " +
toFile.getAbsolutePath() + " FAILED");
}
}
}
@ -1161,7 +1180,7 @@ public synchronized void saveDigestAndRenameCheckpointImage(
CheckpointFaultInjector.getInstance().afterMD5Rename();
// Rename image from tmp file
renameCheckpoint(txid);
renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, NameNodeFile.IMAGE);
// So long as this is the newest image available,
// advertise it as such to other checkpointers
// from now on

View File

@ -45,8 +45,15 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
List<FSImageFile> foundImages = new ArrayList<FSImageFile>();
private long maxSeenTxId = 0;
private static final Pattern IMAGE_REGEX = Pattern.compile(
NameNodeFile.IMAGE.getName() + "_(\\d+)");
private final Pattern namePattern;
FSImageTransactionalStorageInspector() {
this(NameNodeFile.IMAGE);
}
FSImageTransactionalStorageInspector(NameNodeFile nnf) {
namePattern = Pattern.compile(nnf.getName() + "_(\\d+)");
}
@Override
public void inspectDirectory(StorageDirectory sd) throws IOException {
@ -81,7 +88,7 @@ public void inspectDirectory(StorageDirectory sd) throws IOException {
String name = f.getName();
// Check for fsimage_*
Matcher imageMatch = IMAGE_REGEX.matcher(name);
Matcher imageMatch = namePattern.matcher(name);
if (imageMatch.matches()) {
if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
try {

View File

@ -201,6 +201,7 @@
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
@ -7119,11 +7120,11 @@ RollingUpgradeInfo startRollingUpgrade() throws IOException {
checkNameNodeSafeMode("Failed to " + action);
checkRollingUpgrade(action);
getFSImage().saveNamespace(this);
getFSImage().saveNamespace(this, NameNodeFile.IMAGE_ROLLBACK, null);
LOG.info("Successfully saved namespace for preparing rolling upgrade.");
setRollingUpgradeInfo(now());
getEditLog().logUpgradeMarker(rollingUpgradeInfo.getStartTime());
getEditLog().logStartRollingUpgrade(rollingUpgradeInfo.getStartTime());
} finally {
writeUnlock();
}
@ -7140,7 +7141,7 @@ void setRollingUpgradeInfo(long startTime) {
}
/** Is rolling upgrade in progress? */
boolean isRollingUpgrade() {
public boolean isRollingUpgrade() {
return rollingUpgradeInfo != null;
}
@ -7169,11 +7170,13 @@ RollingUpgradeInfo finalizeRollingUpgrade() throws IOException {
returnInfo = new RollingUpgradeInfo(blockPoolId,
rollingUpgradeInfo.getStartTime(), now());
getFSImage().saveNamespace(this);
getFSImage().purgeCheckpoints(NameNodeFile.IMAGE_ROLLBACK);
rollingUpgradeInfo = null;
getEditLog().logFinalizeRollingUpgrade(returnInfo.getFinalizeTime());
} finally {
writeUnlock();
}
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(true, "finalizeRollingUpgrade", null, null, null);

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@ -190,7 +191,7 @@ public Void run() throws Exception {
// Now that we have a new checkpoint, we might be able to
// remove some old ones.
nnImage.purgeOldStorage();
nnImage.purgeOldStorage(NameNodeFile.IMAGE);
} finally {
currentlyDownloadingCheckpoints.remove(txid);
}

View File

@ -46,7 +46,6 @@
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.PersistentLongFile;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.Time;
@ -68,12 +67,13 @@ public class NNStorage extends Storage implements Closeable,
//
// The filenames used for storing the images
//
enum NameNodeFile {
public enum NameNodeFile {
IMAGE ("fsimage"),
TIME ("fstime"), // from "old" pre-HDFS-1073 format
SEEN_TXID ("seen_txid"),
EDITS ("edits"),
IMAGE_NEW ("fsimage.ckpt"),
IMAGE_ROLLBACK("fsimage_rollback"),
EDITS_NEW ("edits.new"), // from "old" pre-HDFS-1073 format
EDITS_INPROGRESS ("edits_inprogress"),
EDITS_TMP ("edits_tmp");
@ -971,8 +971,7 @@ void inspectStorageDirs(FSImageStorageInspector inspector)
* <b>Note:</b> this can mutate the storage info fields (ctime, version, etc).
* @throws IOException if no valid storage dirs are found or no valid layout version
*/
FSImageStorageInspector readAndInspectDirs()
throws IOException {
FSImageStorageInspector readAndInspectDirs(NameNodeFile nnf) throws IOException {
Integer layoutVersion = null;
boolean multipleLV = false;
StringBuilder layoutVersions = new StringBuilder();
@ -1009,7 +1008,7 @@ FSImageStorageInspector readAndInspectDirs()
FSImageStorageInspector inspector;
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
inspector = new FSImageTransactionalStorageInspector();
inspector = new FSImageTransactionalStorageInspector(nnf);
} else {
inspector = new FSImagePreTransactionalStorageInspector();
}

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import com.google.common.base.Preconditions;
@ -88,13 +89,28 @@ public NNStorageRetentionManager(Configuration conf, NNStorage storage,
this(conf, storage, purgeableLogs, new DeletionStoragePurger());
}
public void purgeOldStorage() throws IOException {
void purgeCheckpoints(NameNodeFile nnf) throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector();
new FSImageTransactionalStorageInspector(nnf);
storage.inspectStorageDirs(inspector);
for (FSImageFile image : inspector.getFoundImages()) {
purger.purgeImage(image);
}
}
void purgeOldStorage(NameNodeFile nnf) throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector(nnf);
storage.inspectStorageDirs(inspector);
long minImageTxId = getImageTxIdToRetain(inspector);
purgeCheckpointsOlderThan(inspector, minImageTxId);
if (nnf == NameNodeFile.IMAGE_ROLLBACK) {
// do not purge edits for IMAGE_ROLLBACK.
return;
}
// If fsimage_N is the image we want to keep, then we need to keep
// all txns > N. We can remove anything < N+1, since fsimage_N
// reflects the state up to and including N. However, we also

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.server.namenode.CheckpointConf;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
@ -163,7 +164,7 @@ private void doCheckpoint() throws InterruptedException, IOException {
return;
}
img.saveNamespace(namesystem, canceler);
img.saveNamespace(namesystem, NameNodeFile.IMAGE, canceler);
txid = img.getStorage().getMostRecentCheckpointTxId();
assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
thisCheckpointTxId + " but instead saved at txid=" + txid;
@ -179,8 +180,7 @@ private void doCheckpoint() throws InterruptedException, IOException {
Future<Void> upload = executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
TransferFsImage.uploadImageFromStorage(
activeNNAddress, myNNAddress,
TransferFsImage.uploadImageFromStorage(activeNNAddress, myNNAddress,
namesystem.getFSImage().getStorage(), txid);
return null;
}

View File

@ -129,7 +129,7 @@ private CheckpointSignature runOperations() throws IOException {
DFSTestUtil.runOperations(cluster, dfs, cluster.getConfiguration(0),
dfs.getDefaultBlockSize(), 0);
cluster.getNamesystem().getEditLog().logUpgradeMarker(Time.now());
cluster.getNamesystem().getEditLog().logStartRollingUpgrade(Time.now());
// Force a roll so we get an OP_END_LOG_SEGMENT txn
return cluster.getNameNodeRpc().rollEditLog();

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.junit.Assert;
import org.junit.Before;
@ -239,7 +240,7 @@ private void runTest(TestCaseDescription tc) throws IOException {
// Ask the manager to purge files we don't need any more
new NNStorageRetentionManager(conf,
tc.mockStorage(), tc.mockEditLog(mockPurger), mockPurger)
.purgeOldStorage();
.purgeOldStorage(NameNodeFile.IMAGE);
// Verify that it asked the purger to remove the correct files
Mockito.verify(mockPurger, Mockito.atLeast(0))

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.IOUtils;
@ -527,7 +528,7 @@ public void testCancelSaveNamespace() throws Exception {
Future<Void> saverFuture = pool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
image.saveNamespace(finalFsn, canceler);
image.saveNamespace(finalFsn, NameNodeFile.IMAGE, canceler);
return null;
}
});

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.JournalSet;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.Canceler;
@ -192,7 +193,7 @@ public void testCheckpointWhenNoNewTransactionsHappened()
// We should make exactly one checkpoint at this new txid.
Mockito.verify(spyImage1, Mockito.times(1))
.saveNamespace((FSNamesystem) Mockito.anyObject(),
.saveNamespace((FSNamesystem) Mockito.anyObject(), NameNodeFile.IMAGE,
(Canceler)Mockito.anyObject());
}
@ -280,7 +281,7 @@ public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
DelayAnswer answerer = new DelayAnswer(LOG);
Mockito.doAnswer(answerer).when(spyImage1)
.saveNamespace(Mockito.any(FSNamesystem.class),
.saveNamespace(Mockito.any(FSNamesystem.class), NameNodeFile.IMAGE,
Mockito.any(Canceler.class));
// Perform some edits and wait for a checkpoint to start on the SBN.