HDFS-5920. Support rollback of rolling upgrade in NameNode and JournalNodes. Contributed by Jing Zhao.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1568563 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8741c3b951
commit
470d4253b2
@ -40,3 +40,6 @@ HDFS-5535 subtasks:
|
||||
break after the merge. (Jing Zhao via Arpit Agarwal)
|
||||
|
||||
HDFS-5585. Provide admin commands for data node upgrade (kihwal)
|
||||
|
||||
HDFS-5920. Support rollback of rolling upgrade in NameNode and JournalNodes.
|
||||
(jing9)
|
||||
|
@ -692,6 +692,11 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void discardSegments(long startTxId) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
|
@ -164,5 +164,7 @@ interface AsyncLogger {
|
||||
|
||||
public ListenableFuture<Void> doRollback();
|
||||
|
||||
public ListenableFuture<Void> discardSegments(long startTxId);
|
||||
|
||||
public ListenableFuture<Long> getJournalCTime();
|
||||
}
|
||||
|
@ -366,6 +366,15 @@ class AsyncLoggerSet {
|
||||
return QuorumCall.create(calls);
|
||||
}
|
||||
|
||||
public QuorumCall<AsyncLogger, Void> discardSegments(long startTxId) {
|
||||
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
|
||||
for (AsyncLogger logger : loggers) {
|
||||
ListenableFuture<Void> future = logger.discardSegments(startTxId);
|
||||
calls.put(logger, future);
|
||||
}
|
||||
return QuorumCall.create(calls);
|
||||
}
|
||||
|
||||
public QuorumCall<AsyncLogger, Long> getJournalCTime() {
|
||||
Map<AsyncLogger, ListenableFuture<Long>> calls =
|
||||
Maps.newHashMap();
|
||||
|
@ -621,7 +621,18 @@ public class IPCLoggerChannel implements AsyncLogger {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> discardSegments(final long startTxId) {
|
||||
return executor.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
getProxy().discardSegments(journalId, startTxId);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Long> getJournalCTime() {
|
||||
return executor.submit(new Callable<Long>() {
|
||||
|
@ -86,6 +86,7 @@ public class QuorumJournalManager implements JournalManager {
|
||||
private static final int FINALIZE_TIMEOUT_MS = 60000;
|
||||
private static final int PRE_UPGRADE_TIMEOUT_MS = 60000;
|
||||
private static final int ROLL_BACK_TIMEOUT_MS = 60000;
|
||||
private static final int DISCARD_SEGMENTS_TIMEOUT_MS = 60000;
|
||||
private static final int UPGRADE_TIMEOUT_MS = 60000;
|
||||
private static final int GET_JOURNAL_CTIME_TIMEOUT_MS = 60000;
|
||||
|
||||
@ -600,6 +601,25 @@ public class QuorumJournalManager implements JournalManager {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void discardSegments(long startTxId) throws IOException {
|
||||
QuorumCall<AsyncLogger, Void> call = loggers.discardSegments(startTxId);
|
||||
try {
|
||||
call.waitFor(loggers.size(), loggers.size(), 0,
|
||||
DISCARD_SEGMENTS_TIMEOUT_MS, "discardSegments");
|
||||
if (call.countExceptions() > 0) {
|
||||
call.rethrowException(
|
||||
"Could not perform discardSegments of one or more JournalNodes");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(
|
||||
"Interrupted waiting for discardSegments() response");
|
||||
} catch (TimeoutException e) {
|
||||
throw new IOException(
|
||||
"Timed out waiting for discardSegments() response");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getJournalCTime() throws IOException {
|
||||
QuorumCall<AsyncLogger, Long> call = loggers.getJournalCTime();
|
||||
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.io.retry.Idempotent;
|
||||
import org.apache.hadoop.security.KerberosInfo;
|
||||
|
||||
/**
|
||||
@ -156,5 +157,13 @@ public interface QJournalProtocol {
|
||||
|
||||
public void doRollback(String journalId) throws IOException;
|
||||
|
||||
/**
|
||||
* Discard journal segments whose first TxId is greater than or equal to the
|
||||
* given txid.
|
||||
*/
|
||||
@Idempotent
|
||||
public void discardSegments(String journalId, long startTxId)
|
||||
throws IOException;
|
||||
|
||||
public Long getJournalCTime(String journalId) throws IOException;
|
||||
}
|
||||
|
@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRec
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackResponseProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DiscardSegmentsRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DiscardSegmentsResponseProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeResponseProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeRequestProto;
|
||||
@ -64,8 +66,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogs
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
@ -323,6 +325,18 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DiscardSegmentsResponseProto discardSegments(
|
||||
RpcController controller, DiscardSegmentsRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
impl.discardSegments(convert(request.getJid()), request.getStartTxId());
|
||||
return DiscardSegmentsResponseProto.getDefaultInstance();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetJournalCTimeResponseProto getJournalCTime(RpcController controller,
|
||||
GetJournalCTimeRequestProto request) throws ServiceException {
|
||||
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackResponseProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DiscardSegmentsRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeRequestProto;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoRollbackRequestProto;
|
||||
@ -354,6 +355,19 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void discardSegments(String journalId, long startTxId)
|
||||
throws IOException {
|
||||
try {
|
||||
rpcProxy.discardSegments(NULL_CONTROLLER,
|
||||
DiscardSegmentsRequestProto.newBuilder()
|
||||
.setJid(convertJournalId(journalId)).setStartTxId(startTxId)
|
||||
.build());
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getJournalCTime(String journalId) throws IOException {
|
||||
try {
|
||||
|
@ -1037,6 +1037,12 @@ public class Journal implements Closeable {
|
||||
storage.getJournalManager().doRollback();
|
||||
}
|
||||
|
||||
public void discardSegments(long startTxId) throws IOException {
|
||||
storage.getJournalManager().discardSegments(startTxId);
|
||||
// we delete all the segments after the startTxId. let's reset committedTxnId
|
||||
committedTxnId.set(startTxId - 1);
|
||||
}
|
||||
|
||||
public Long getJournalCTime() throws IOException {
|
||||
return storage.getJournalManager().getJournalCTime();
|
||||
}
|
||||
|
@ -309,6 +309,11 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
||||
getOrCreateJournal(journalId).doRollback();
|
||||
}
|
||||
|
||||
public void discardSegments(String journalId, long startTxId)
|
||||
throws IOException {
|
||||
getOrCreateJournal(journalId).discardSegments(startTxId);
|
||||
}
|
||||
|
||||
public Long getJournalCTime(String journalId) throws IOException {
|
||||
return getOrCreateJournal(journalId).getJournalCTime();
|
||||
}
|
||||
|
@ -233,6 +233,12 @@ class JournalNodeRpcServer implements QJournalProtocol {
|
||||
jn.doRollback(journalId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void discardSegments(String journalId, long startTxId)
|
||||
throws IOException {
|
||||
jn.discardSegments(journalId, startTxId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getJournalCTime(String journalId) throws IOException {
|
||||
return jn.getJournalCTime(journalId);
|
||||
|
@ -126,6 +126,11 @@ class BackupJournalManager implements JournalManager {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void discardSegments(long startTxId) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getJournalCTime() throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
|
@ -1406,7 +1406,14 @@ public class FSEditLog implements LogsPurgeable {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public synchronized void discardSegments(long markerTxid)
|
||||
throws IOException {
|
||||
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
|
||||
jas.getManager().discardSegments(markerTxid);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxId, boolean inProgressOk) throws IOException {
|
||||
|
@ -543,18 +543,14 @@ public class FSImage implements Closeable {
|
||||
private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,
|
||||
MetaRecoveryContext recovery)
|
||||
throws IOException {
|
||||
final NameNodeFile nnf;
|
||||
if (startOpt == StartupOption.ROLLINGUPGRADE
|
||||
&& startOpt.getRollingUpgradeStartupOption()
|
||||
== RollingUpgradeStartupOption.ROLLBACK) {
|
||||
nnf = NameNodeFile.IMAGE_ROLLBACK;
|
||||
} else {
|
||||
nnf = NameNodeFile.IMAGE;
|
||||
}
|
||||
final boolean rollingRollback = startOpt == StartupOption.ROLLINGUPGRADE
|
||||
&& startOpt.getRollingUpgradeStartupOption() ==
|
||||
RollingUpgradeStartupOption.ROLLBACK;
|
||||
final NameNodeFile nnf = rollingRollback ? NameNodeFile.IMAGE_ROLLBACK
|
||||
: NameNodeFile.IMAGE;
|
||||
final FSImageStorageInspector inspector = storage.readAndInspectDirs(nnf);
|
||||
|
||||
isUpgradeFinalized = inspector.isUpgradeFinalized();
|
||||
|
||||
List<FSImageFile> imageFiles = inspector.getLatestImages();
|
||||
|
||||
StartupProgress prog = NameNode.getStartupProgress();
|
||||
@ -573,7 +569,17 @@ public class FSImage implements Closeable {
|
||||
// If we're open for write, we're either non-HA or we're the active NN, so
|
||||
// we better be able to load all the edits. If we're the standby NN, it's
|
||||
// OK to not be able to read all of edits right now.
|
||||
long toAtLeastTxId = editLog.isOpenForWrite() ? inspector.getMaxSeenTxId() : 0;
|
||||
// In the meanwhile, for HA upgrade, we will still write editlog thus need
|
||||
// this toAtLeastTxId to be set to the max-seen txid
|
||||
// For rollback in rolling upgrade, we need to set the toAtLeastTxId to
|
||||
// the txid right before the upgrade marker.
|
||||
long toAtLeastTxId = editLog.isOpenForWrite() ? inspector
|
||||
.getMaxSeenTxId() : 0;
|
||||
if (rollingRollback) {
|
||||
// note that the first image in imageFiles is the special checkpoint
|
||||
// for the rolling upgrade
|
||||
toAtLeastTxId = imageFiles.get(0).getCheckpointTxId() + 2;
|
||||
}
|
||||
editStreams = editLog.selectInputStreams(
|
||||
imageFiles.get(0).getCheckpointTxId() + 1,
|
||||
toAtLeastTxId, recovery, false);
|
||||
@ -581,8 +587,7 @@ public class FSImage implements Closeable {
|
||||
editStreams = FSImagePreTransactionalStorageInspector
|
||||
.getEditLogStreams(storage);
|
||||
}
|
||||
int maxOpSize = conf.getInt(DFSConfigKeys.
|
||||
DFS_NAMENODE_MAX_OP_SIZE_KEY,
|
||||
int maxOpSize = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT);
|
||||
for (EditLogInputStream elis : editStreams) {
|
||||
elis.setMaxOpSize(maxOpSize);
|
||||
@ -613,13 +618,34 @@ public class FSImage implements Closeable {
|
||||
throw new IOException("Failed to load an FSImage file!");
|
||||
}
|
||||
prog.endPhase(Phase.LOADING_FSIMAGE);
|
||||
long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
|
||||
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
|
||||
txnsAdvanced);
|
||||
long txnsAdvanced = 0;
|
||||
|
||||
loadEdits(editStreams, target, startOpt, recovery);
|
||||
if (rollingRollback) {
|
||||
// Trigger the rollback for rolling upgrade.
|
||||
// Here lastAppliedTxId == (markerTxId - 1), and we should decrease 1 from
|
||||
// lastAppliedTxId for the start-segment transaction.
|
||||
rollingRollback(lastAppliedTxId--, imageFiles.get(0).getCheckpointTxId());
|
||||
needToSave = false;
|
||||
} else {
|
||||
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
|
||||
txnsAdvanced);
|
||||
}
|
||||
editLog.setNextTxId(lastAppliedTxId + 1);
|
||||
return needToSave;
|
||||
}
|
||||
|
||||
/** rollback for rolling upgrade. */
|
||||
private void rollingRollback(long discardSegmentTxId, long ckptId)
|
||||
throws IOException {
|
||||
// discard discard unnecessary editlog segments starting from the given id
|
||||
this.editLog.discardSegments(discardSegmentTxId);
|
||||
// rename the special checkpoint
|
||||
renameCheckpoint(ckptId, NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE);
|
||||
// purge all the checkpoints after the marker
|
||||
archivalManager.purgeCheckpoinsAfter(NameNodeFile.IMAGE, ckptId);
|
||||
}
|
||||
|
||||
void loadFSImageFile(FSNamesystem target, MetaRecoveryContext recovery,
|
||||
FSImageFile imageFile) throws IOException {
|
||||
LOG.debug("Planning to load image :\n" + imageFile);
|
||||
@ -707,7 +733,7 @@ public class FSImage implements Closeable {
|
||||
|
||||
private long loadEdits(Iterable<EditLogInputStream> editStreams,
|
||||
FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
|
||||
StartupProgress prog = NameNode.getStartupProgress();
|
||||
prog.beginPhase(Phase.LOADING_EDITS);
|
||||
@ -727,15 +753,19 @@ public class FSImage implements Closeable {
|
||||
// have been successfully applied before the error.
|
||||
lastAppliedTxId = loader.getLastAppliedTxId();
|
||||
}
|
||||
boolean rollingRollback = startOpt == StartupOption.ROLLINGUPGRADE &&
|
||||
startOpt.getRollingUpgradeStartupOption() ==
|
||||
RollingUpgradeStartupOption.ROLLBACK;
|
||||
// If we are in recovery mode, we may have skipped over some txids.
|
||||
if (editIn.getLastTxId() != HdfsConstants.INVALID_TXID) {
|
||||
if (editIn.getLastTxId() != HdfsConstants.INVALID_TXID
|
||||
&& !rollingRollback) {
|
||||
lastAppliedTxId = editIn.getLastTxId();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
FSEditLog.closeAllStreams(editStreams);
|
||||
// update the counts
|
||||
updateCountForQuota(target.dir.rootDir);
|
||||
updateCountForQuota(target.dir.rootDir);
|
||||
}
|
||||
prog.endPhase(Phase.LOADING_EDITS);
|
||||
return lastAppliedTxId - prevLastAppliedTxId;
|
||||
@ -836,11 +866,11 @@ public class FSImage implements Closeable {
|
||||
/**
|
||||
* Save the contents of the FS image to the file.
|
||||
*/
|
||||
void saveFSImage(SaveNamespaceContext context, StorageDirectory sd)
|
||||
throws IOException {
|
||||
void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
|
||||
NameNodeFile dstType) throws IOException {
|
||||
long txid = context.getTxId();
|
||||
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
|
||||
File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
|
||||
File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
|
||||
|
||||
FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
|
||||
FSImageCompression compression = FSImageCompression.createCompression(conf);
|
||||
@ -864,16 +894,19 @@ public class FSImage implements Closeable {
|
||||
private class FSImageSaver implements Runnable {
|
||||
private final SaveNamespaceContext context;
|
||||
private StorageDirectory sd;
|
||||
private final NameNodeFile nnf;
|
||||
|
||||
public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd) {
|
||||
public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd,
|
||||
NameNodeFile nnf) {
|
||||
this.context = context;
|
||||
this.sd = sd;
|
||||
this.nnf = nnf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
saveFSImage(context, sd);
|
||||
saveFSImage(context, sd, nnf);
|
||||
} catch (SaveNamespaceCancelledException snce) {
|
||||
LOG.info("Cancelled image saving for " + sd.getRoot() +
|
||||
": " + snce.getMessage());
|
||||
@ -971,7 +1004,7 @@ public class FSImage implements Closeable {
|
||||
for (Iterator<StorageDirectory> it
|
||||
= storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
|
||||
StorageDirectory sd = it.next();
|
||||
FSImageSaver saver = new FSImageSaver(ctx, sd);
|
||||
FSImageSaver saver = new FSImageSaver(ctx, sd, nnf);
|
||||
Thread saveThread = new Thread(saver, saver.toString());
|
||||
saveThreads.add(saveThread);
|
||||
saveThread.start();
|
||||
|
@ -198,6 +198,32 @@ public class FileJournalManager implements JournalManager {
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Discard all editlog segments whose first txid is greater than or equal to
|
||||
* the given txid, by renaming them with suffix ".trash".
|
||||
*/
|
||||
private void discardEditLogSegments(long startTxId) throws IOException {
|
||||
File currentDir = sd.getCurrentDir();
|
||||
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
|
||||
List<EditLogFile> toTrash = Lists.newArrayList();
|
||||
LOG.info("Discard the EditLog files, the given start txid is " + startTxId);
|
||||
// go through the editlog files to make sure the startTxId is right at the
|
||||
// segment boundary
|
||||
for (EditLogFile elf : allLogFiles) {
|
||||
if (elf.getFirstTxId() >= startTxId) {
|
||||
toTrash.add(elf);
|
||||
} else {
|
||||
Preconditions.checkState(elf.getLastTxId() < startTxId);
|
||||
}
|
||||
}
|
||||
|
||||
for (EditLogFile elf : toTrash) {
|
||||
// rename these editlog file as .trash
|
||||
elf.moveAsideTrashFile(startTxId);
|
||||
LOG.info("Trash the EditLog file " + elf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* returns matching edit logs via the log directory. Simple helper function
|
||||
@ -467,6 +493,11 @@ public class FileJournalManager implements JournalManager {
|
||||
renameSelf(".corrupt");
|
||||
}
|
||||
|
||||
void moveAsideTrashFile(long markerTxid) throws IOException {
|
||||
assert this.getFirstTxId() >= markerTxid;
|
||||
renameSelf(".trash");
|
||||
}
|
||||
|
||||
public void moveAsideEmptyFile() throws IOException {
|
||||
assert lastTxId == HdfsConstants.INVALID_TXID;
|
||||
renameSelf(".empty");
|
||||
@ -530,6 +561,11 @@ public class FileJournalManager implements JournalManager {
|
||||
NNUpgradeUtil.doRollBack(sd);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void discardSegments(long startTxid) throws IOException {
|
||||
discardEditLogSegments(startTxid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getJournalCTime() throws IOException {
|
||||
StorageInfo sInfo = new StorageInfo(NodeType.NAME_NODE);
|
||||
|
@ -109,7 +109,15 @@ public interface JournalManager extends Closeable, FormatConfirmable,
|
||||
* roll back their state should just return without error.
|
||||
*/
|
||||
void doRollback() throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* Discard the segments whose first txid is >= the given txid.
|
||||
* @param startTxId The given txid should be right at the segment boundary,
|
||||
* i.e., it should be the first txid of some segment, if segment corresponding
|
||||
* to the txid exists.
|
||||
*/
|
||||
void discardSegments(long startTxId) throws IOException;
|
||||
|
||||
/**
|
||||
* @return the CTime of the journal manager.
|
||||
*/
|
||||
|
@ -699,6 +699,12 @@ public class JournalSet implements JournalManager {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void discardSegments(long startTxId) throws IOException {
|
||||
// This operation is handled by FSEditLog directly.
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getJournalCTime() throws IOException {
|
||||
// This operation is handled by FSEditLog directly.
|
||||
|
@ -89,7 +89,8 @@ public class NNStorage extends Storage implements Closeable,
|
||||
* or of type EDITS which stores edits or of type IMAGE_AND_EDITS which
|
||||
* stores both fsimage and edits.
|
||||
*/
|
||||
static enum NameNodeDirType implements StorageDirType {
|
||||
@VisibleForTesting
|
||||
public static enum NameNodeDirType implements StorageDirType {
|
||||
UNDEFINED,
|
||||
IMAGE,
|
||||
EDITS,
|
||||
@ -657,20 +658,27 @@ public class NNStorage extends Storage implements Closeable,
|
||||
|
||||
@VisibleForTesting
|
||||
public static String getCheckpointImageFileName(long txid) {
|
||||
return String.format("%s_%019d",
|
||||
NameNodeFile.IMAGE_NEW.getName(), txid);
|
||||
return getNameNodeFileName(NameNodeFile.IMAGE_NEW, txid);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static String getImageFileName(long txid) {
|
||||
return String.format("%s_%019d",
|
||||
NameNodeFile.IMAGE.getName(), txid);
|
||||
return getNameNodeFileName(NameNodeFile.IMAGE, txid);
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
public static String getRollbackImageFileName(long txid) {
|
||||
return getNameNodeFileName(NameNodeFile.IMAGE_ROLLBACK, txid);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
private static String getNameNodeFileName(NameNodeFile nnf, long txid) {
|
||||
return String.format("%s_%019d", nnf.getName(), txid);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static String getInProgressEditsFileName(long startTxId) {
|
||||
return String.format("%s_%019d", NameNodeFile.EDITS_INPROGRESS.getName(),
|
||||
startTxId);
|
||||
return getNameNodeFileName(NameNodeFile.EDITS_INPROGRESS, startTxId);
|
||||
}
|
||||
|
||||
static File getInProgressEditsFile(StorageDirectory sd, long startTxId) {
|
||||
|
@ -90,11 +90,18 @@ public class NNStorageRetentionManager {
|
||||
}
|
||||
|
||||
void purgeCheckpoints(NameNodeFile nnf) throws IOException {
|
||||
purgeCheckpoinsAfter(nnf, -1);
|
||||
}
|
||||
|
||||
void purgeCheckpoinsAfter(NameNodeFile nnf, long fromTxId)
|
||||
throws IOException {
|
||||
FSImageTransactionalStorageInspector inspector =
|
||||
new FSImageTransactionalStorageInspector(nnf);
|
||||
storage.inspectStorageDirs(inspector);
|
||||
for (FSImageFile image : inspector.getFoundImages()) {
|
||||
purger.purgeImage(image);
|
||||
if (image.getCheckpointTxId() > fromTxId) {
|
||||
purger.purgeImage(image);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -199,6 +199,17 @@ message DoRollbackRequestProto {
|
||||
message DoRollbackResponseProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* discardSegments()
|
||||
*/
|
||||
message DiscardSegmentsRequestProto {
|
||||
required JournalIdProto jid = 1;
|
||||
required uint64 startTxId = 2;
|
||||
}
|
||||
|
||||
message DiscardSegmentsResponseProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* getJournalState()
|
||||
*/
|
||||
@ -314,6 +325,8 @@ service QJournalProtocolService {
|
||||
|
||||
rpc doRollback(DoRollbackRequestProto) returns (DoRollbackResponseProto);
|
||||
|
||||
rpc discardSegments(DiscardSegmentsRequestProto) returns (DiscardSegmentsResponseProto);
|
||||
|
||||
rpc getJournalState(GetJournalStateRequestProto) returns (GetJournalStateResponseProto);
|
||||
|
||||
rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto);
|
||||
|
@ -0,0 +1,196 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* This class tests rollback for rolling upgrade.
|
||||
*/
|
||||
public class TestRollingUpgradeRollback {
|
||||
|
||||
private static final int NUM_JOURNAL_NODES = 3;
|
||||
private static final String JOURNAL_ID = "myjournal";
|
||||
|
||||
private static boolean fileExists(List<File> files) {
|
||||
for (File file : files) {
|
||||
if (file.exists()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void checkNNStorage(NNStorage storage, long imageTxId,
|
||||
long trashEndTxId) {
|
||||
List<File> finalizedEdits = storage.getFiles(
|
||||
NNStorage.NameNodeDirType.EDITS,
|
||||
NNStorage.getFinalizedEditsFileName(1, imageTxId));
|
||||
Assert.assertTrue(fileExists(finalizedEdits));
|
||||
List<File> inprogressEdits = storage.getFiles(
|
||||
NNStorage.NameNodeDirType.EDITS,
|
||||
NNStorage.getInProgressEditsFileName(imageTxId + 1));
|
||||
// For rollback case we will have an inprogress file for future transactions
|
||||
Assert.assertTrue(fileExists(inprogressEdits));
|
||||
if (trashEndTxId > 0) {
|
||||
List<File> trashedEdits = storage.getFiles(
|
||||
NNStorage.NameNodeDirType.EDITS,
|
||||
NNStorage.getFinalizedEditsFileName(imageTxId + 1, trashEndTxId)
|
||||
+ ".trash");
|
||||
Assert.assertTrue(fileExists(trashedEdits));
|
||||
}
|
||||
String imageFileName = trashEndTxId > 0 ? NNStorage
|
||||
.getImageFileName(imageTxId) : NNStorage
|
||||
.getRollbackImageFileName(imageTxId);
|
||||
List<File> imageFiles = storage.getFiles(
|
||||
NNStorage.NameNodeDirType.IMAGE, imageFileName);
|
||||
Assert.assertTrue(fileExists(imageFiles));
|
||||
}
|
||||
|
||||
private void checkJNStorage(File dir, long discardStartTxId,
|
||||
long discardEndTxId) {
|
||||
File finalizedEdits = new File(dir, NNStorage.getFinalizedEditsFileName(1,
|
||||
discardStartTxId - 1));
|
||||
Assert.assertTrue(finalizedEdits.exists());
|
||||
File trashEdits = new File(dir, NNStorage.getFinalizedEditsFileName(
|
||||
discardStartTxId, discardEndTxId) + ".trash");
|
||||
Assert.assertTrue(trashEdits.exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRollbackCommand() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
MiniDFSCluster cluster = null;
|
||||
final Path foo = new Path("/foo");
|
||||
final Path bar = new Path("/bar");
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||
cluster.waitActive();
|
||||
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final DFSAdmin dfsadmin = new DFSAdmin(conf);
|
||||
dfs.mkdirs(foo);
|
||||
|
||||
// start rolling upgrade
|
||||
Assert.assertEquals(0,
|
||||
dfsadmin.run(new String[] { "-rollingUpgrade", "start" }));
|
||||
// create new directory
|
||||
dfs.mkdirs(bar);
|
||||
|
||||
// check NNStorage
|
||||
NNStorage storage = cluster.getNamesystem().getFSImage().getStorage();
|
||||
checkNNStorage(storage, 3, -1); // (startSegment, mkdir, endSegment)
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
NameNode nn = null;
|
||||
try {
|
||||
nn = NameNode.createNameNode(new String[] { "-rollingUpgrade",
|
||||
"rollback" }, conf);
|
||||
// make sure /foo is still there, but /bar is not
|
||||
INode fooNode = nn.getNamesystem().getFSDirectory()
|
||||
.getINode4Write(foo.toString());
|
||||
Assert.assertNotNull(fooNode);
|
||||
INode barNode = nn.getNamesystem().getFSDirectory()
|
||||
.getINode4Write(bar.toString());
|
||||
Assert.assertNull(barNode);
|
||||
|
||||
// check the details of NNStorage
|
||||
NNStorage storage = nn.getNamesystem().getFSImage().getStorage();
|
||||
// (startSegment, upgrade marker, mkdir, endSegment)
|
||||
checkNNStorage(storage, 3, 7);
|
||||
} finally {
|
||||
if (nn != null) {
|
||||
nn.stop();
|
||||
nn.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRollbackWithQJM() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
MiniJournalCluster mjc = null;
|
||||
MiniDFSCluster cluster = null;
|
||||
final Path foo = new Path("/foo");
|
||||
final Path bar = new Path("/bar");
|
||||
|
||||
try {
|
||||
mjc = new MiniJournalCluster.Builder(conf).numJournalNodes(
|
||||
NUM_JOURNAL_NODES).build();
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, mjc
|
||||
.getQuorumJournalURI(JOURNAL_ID).toString());
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||
cluster.waitActive();
|
||||
|
||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final DFSAdmin dfsadmin = new DFSAdmin(conf);
|
||||
dfs.mkdirs(foo);
|
||||
|
||||
// start rolling upgrade
|
||||
Assert.assertEquals(0,
|
||||
dfsadmin.run(new String[] { "-rollingUpgrade", "start" }));
|
||||
// create new directory
|
||||
dfs.mkdirs(bar);
|
||||
dfs.close();
|
||||
|
||||
// rollback
|
||||
cluster.restartNameNode("-rollingUpgrade", "rollback");
|
||||
// make sure /foo is still there, but /bar is not
|
||||
dfs = cluster.getFileSystem();
|
||||
Assert.assertTrue(dfs.exists(foo));
|
||||
Assert.assertFalse(dfs.exists(bar));
|
||||
|
||||
// check storage in JNs
|
||||
for (int i = 0; i < NUM_JOURNAL_NODES; i++) {
|
||||
File dir = mjc.getCurrentDir(0, JOURNAL_ID);
|
||||
// segments:(startSegment, mkdir, endSegment), (startSegment, upgrade
|
||||
// marker, mkdir, endSegment)
|
||||
checkJNStorage(dir, 4, 7);
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
if (mjc != null) {
|
||||
mjc.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO: Test rollback scenarios where StandbyNameNode does checkpoints during
|
||||
* rolling upgrade.
|
||||
*/
|
||||
|
||||
// TODO: rollback could not succeed in all JN
|
||||
}
|
@ -212,6 +212,9 @@ public class TestGenericJournalConf {
|
||||
@Override
|
||||
public void doRollback() throws IOException {}
|
||||
|
||||
@Override
|
||||
public void discardSegments(long startTxId) throws IOException {}
|
||||
|
||||
@Override
|
||||
public long getJournalCTime() throws IOException {
|
||||
return -1;
|
||||
|
@ -141,7 +141,7 @@ public class TestSaveNamespace {
|
||||
doAnswer(new FaultySaveImage(true)).
|
||||
when(spyImage).saveFSImage(
|
||||
(SaveNamespaceContext)anyObject(),
|
||||
(StorageDirectory)anyObject());
|
||||
(StorageDirectory)anyObject(), (NameNodeFile) anyObject());
|
||||
shouldFail = false;
|
||||
break;
|
||||
case SAVE_SECOND_FSIMAGE_IOE:
|
||||
@ -149,7 +149,7 @@ public class TestSaveNamespace {
|
||||
doAnswer(new FaultySaveImage(false)).
|
||||
when(spyImage).saveFSImage(
|
||||
(SaveNamespaceContext)anyObject(),
|
||||
(StorageDirectory)anyObject());
|
||||
(StorageDirectory)anyObject(), (NameNodeFile) anyObject());
|
||||
shouldFail = false;
|
||||
break;
|
||||
case SAVE_ALL_FSIMAGES:
|
||||
@ -157,7 +157,7 @@ public class TestSaveNamespace {
|
||||
doThrow(new RuntimeException("Injected")).
|
||||
when(spyImage).saveFSImage(
|
||||
(SaveNamespaceContext)anyObject(),
|
||||
(StorageDirectory)anyObject());
|
||||
(StorageDirectory)anyObject(), (NameNodeFile) anyObject());
|
||||
shouldFail = true;
|
||||
break;
|
||||
case WRITE_STORAGE_ALL:
|
||||
@ -382,7 +382,7 @@ public class TestSaveNamespace {
|
||||
doThrow(new IOException("Injected fault: saveFSImage")).
|
||||
when(spyImage).saveFSImage(
|
||||
(SaveNamespaceContext)anyObject(),
|
||||
(StorageDirectory)anyObject());
|
||||
(StorageDirectory)anyObject(), (NameNodeFile) anyObject());
|
||||
|
||||
try {
|
||||
doAnEdit(fsn, 1);
|
||||
|
Loading…
x
Reference in New Issue
Block a user