HDFS-2507. Allow saveNamespace operations to be canceled. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1190060 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-10-27 22:11:10 +00:00
parent 072ef952c3
commit 646e855f6e
8 changed files with 352 additions and 61 deletions

View File

@ -824,6 +824,8 @@ Release 0.23.0 - Unreleased
HDFS-1869. mkdirs should use the supplied permission for all of the created
directories. (Daryn Sharp via szetszwo)
HDFS-2507. Allow saveNamespace operations to be canceled. (todd)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -83,7 +83,9 @@ public class FSImage implements Closeable {
final private Configuration conf;
private final NNStorageRetentionManager archivalManager;
private final NNStorageRetentionManager archivalManager;
private SaveNamespaceContext curSaveNamespaceContext = null;
/**
@ -715,14 +717,15 @@ private void loadFSImage(File curFile, MD5Hash expectedMd5,
/**
* Save the contents of the FS image to the file.
*/
void saveFSImage(FSNamesystem source, StorageDirectory sd, long txid)
void saveFSImage(SaveNamespaceContext context, StorageDirectory sd)
throws IOException {
long txid = context.getTxId();
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
FSImageFormat.Saver saver = new FSImageFormat.Saver();
FSImageFormat.Saver saver = new FSImageFormat.Saver(context);
FSImageCompression compression = FSImageCompression.createCompression(conf);
saver.save(newFile, txid, source, compression);
saver.save(newFile, compression);
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
storage.setMostRecentCheckpointTxId(txid);
@ -740,25 +743,24 @@ void saveFSImage(FSNamesystem source, StorageDirectory sd, long txid)
* and writing it out.
*/
private class FSImageSaver implements Runnable {
private final SaveNamespaceContext context;
private StorageDirectory sd;
private List<StorageDirectory> errorSDs;
private final long txid;
private final FSNamesystem source;
FSImageSaver(FSNamesystem source, StorageDirectory sd,
List<StorageDirectory> errorSDs, long txid) {
this.source = source;
public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd) {
this.context = context;
this.sd = sd;
this.errorSDs = errorSDs;
this.txid = txid;
}
public void run() {
try {
saveFSImage(source, sd, txid);
saveFSImage(context, sd);
} catch (SaveNamespaceCancelledException snce) {
LOG.info("Cancelled image saving for " + sd.getRoot() +
": " + snce.getMessage());
// don't report an error on the storage dir!
} catch (Throwable t) {
LOG.error("Unable to save image for " + sd.getRoot(), t);
errorSDs.add(sd);
context.reportErrorOnStorageDirectory(sd);
}
}
@ -784,7 +786,7 @@ private void waitForThreads(List<Thread> threads) {
* Save the contents of the FS image to a new image file in each of the
* current storage directories.
*/
void saveNamespace(FSNamesystem source) throws IOException {
synchronized void saveNamespace(FSNamesystem source) throws IOException {
assert editLog != null : "editLog must be initialized";
storage.attemptRestoreRemovedStorage();
@ -800,46 +802,71 @@ void saveNamespace(FSNamesystem source) throws IOException {
} finally {
if (editLogWasOpen) {
editLog.startLogSegment(imageTxId + 1, true);
// Take this opportunity to note the current transaction
// Take this opportunity to note the current transaction.
// Even if the namespace save was cancelled, this marker
// is only used to determine what transaction ID is required
// for startup. So, it doesn't hurt to update it unnecessarily.
storage.writeTransactionIdFileToStorage(imageTxId + 1);
}
}
}
protected void saveFSImageInAllDirs(FSNamesystem source, long txid)
throws IOException {
void cancelSaveNamespace(String reason)
throws InterruptedException {
SaveNamespaceContext ctx = curSaveNamespaceContext;
if (ctx != null) {
ctx.cancel(reason); // waits until complete
}
}
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
throws IOException {
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException("No image directories available!");
}
List<StorageDirectory> errorSDs =
Collections.synchronizedList(new ArrayList<StorageDirectory>());
List<Thread> saveThreads = new ArrayList<Thread>();
// save images into current
for (Iterator<StorageDirectory> it
= storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
StorageDirectory sd = it.next();
FSImageSaver saver = new FSImageSaver(source, sd, errorSDs, txid);
Thread saveThread = new Thread(saver, saver.toString());
saveThreads.add(saveThread);
saveThread.start();
}
waitForThreads(saveThreads);
saveThreads.clear();
storage.reportErrorsOnDirectories(errorSDs);
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException(
"Failed to save in any storage directories while saving namespace.");
}
renameCheckpoint(txid);
SaveNamespaceContext ctx = new SaveNamespaceContext(
source, txid);
curSaveNamespaceContext = ctx;
// Since we now have a new checkpoint, we can clean up some
// old edit logs and checkpoints.
purgeOldStorage();
try {
List<Thread> saveThreads = new ArrayList<Thread>();
// save images into current
for (Iterator<StorageDirectory> it
= storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
StorageDirectory sd = it.next();
FSImageSaver saver = new FSImageSaver(ctx, sd);
Thread saveThread = new Thread(saver, saver.toString());
saveThreads.add(saveThread);
saveThread.start();
}
waitForThreads(saveThreads);
saveThreads.clear();
storage.reportErrorsOnDirectories(ctx.getErrorSDs());
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException(
"Failed to save in any storage directories while saving namespace.");
}
if (ctx.isCancelled()) {
deleteCancelledCheckpoint(txid);
ctx.checkCancelled(); // throws
assert false : "should have thrown above!";
}
renameCheckpoint(txid);
// Since we now have a new checkpoint, we can clean up some
// old edit logs and checkpoints.
purgeOldStorage();
} finally {
// Notify any threads waiting on the checkpoint to be canceled
// that it is complete.
ctx.markComplete();
ctx = null;
}
}
/**
@ -873,6 +900,24 @@ private void renameCheckpoint(long txid) throws IOException {
}
if(al != null) storage.reportErrorsOnDirectories(al);
}
/**
* Deletes the checkpoint file in every storage directory,
* since the checkpoint was cancelled.
*/
private void deleteCancelledCheckpoint(long txid) throws IOException {
ArrayList<StorageDirectory> al = Lists.newArrayList();
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) {
File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
if (ckpt.exists() && !ckpt.delete()) {
LOG.warn("Unable to delete cancelled checkpoint in " + sd);
al.add(sd);
}
}
storage.reportErrorsOnDirectories(al);
}
private void renameCheckpointInDir(StorageDirectory sd, long txid)
throws IOException {
@ -1055,4 +1100,5 @@ public String getBlockPoolID() {
public synchronized long getLastAppliedTxId() {
return lastAppliedTxId;
}
}

View File

@ -508,6 +508,7 @@ byte[][] getParent(byte[][] path) {
* functions may be used to retrieve information about the file that was written.
*/
static class Saver {
private final SaveNamespaceContext context;
/** Set to true once an image has been written */
private boolean saved = false;
@ -529,6 +530,11 @@ private void checkNotSaved() {
throw new IllegalStateException("FSImageSaver has already saved an image");
}
}
Saver(SaveNamespaceContext context) {
this.context = context;
}
/**
* Return the MD5 checksum of the image file that was saved.
@ -539,12 +545,11 @@ MD5Hash getSavedDigest() {
}
void save(File newFile,
long txid,
FSNamesystem sourceNamesystem,
FSImageCompression compression)
throws IOException {
checkNotSaved();
final FSNamesystem sourceNamesystem = context.getSourceNamesystem();
FSDirectory fsDir = sourceNamesystem.dir;
long startTime = now();
//
@ -565,7 +570,7 @@ void save(File newFile,
.getNamespaceID());
out.writeLong(fsDir.rootDir.numItemsInTree());
out.writeLong(sourceNamesystem.getGenerationStamp());
out.writeLong(txid);
out.writeLong(context.getTxId());
// write compression info and set up compressed stream
out = compression.writeHeaderAndWrapStream(fos);
@ -581,10 +586,12 @@ void save(File newFile,
saveImage(strbuf, fsDir.rootDir, out);
// save files under construction
sourceNamesystem.saveFilesUnderConstruction(out);
context.checkCancelled();
sourceNamesystem.saveSecretManagerState(out);
strbuf = null;
context.checkCancelled();
out.flush();
context.checkCancelled();
fout.getChannel().force(true);
} finally {
out.close();
@ -603,9 +610,10 @@ void save(File newFile,
* This is a recursive procedure, which first saves all children of
* a current directory and then moves inside the sub-directories.
*/
private static void saveImage(ByteBuffer currentDirName,
private void saveImage(ByteBuffer currentDirName,
INodeDirectory current,
DataOutputStream out) throws IOException {
context.checkCancelled();
List<INode> children = current.getChildrenRaw();
if (children == null || children.isEmpty())
return;

View File

@ -2728,6 +2728,27 @@ void saveNamespace() throws AccessControlException, IOException {
}
}
/**
* Cancel an ongoing saveNamespace operation and wait for its
* threads to exit, if one is currently in progress.
*
* If no such operation is in progress, this call does nothing.
*
* @param reason a reason to be communicated to the caller saveNamespace
* @throws IOException
*/
void cancelSaveNamespace(String reason) throws IOException {
readLock();
try {
checkSuperuserPrivilege();
getFSImage().cancelSaveNamespace(reason);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
readUnlock();
}
}
/**
* Enables/Disables/Checks restoring failed storage replicas if the storage becomes available again.
* Requires superuser privilege.

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
class SaveNamespaceCancelledException extends IOException {
private static final long serialVersionUID = 1L;
SaveNamespaceCancelledException(String cancelReason) {
super(cancelReason);
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import com.google.common.base.Preconditions;
/**
* Context for an ongoing SaveNamespace operation. This class
* allows cancellation, and also is responsible for accumulating
* failed storage directories.
*/
class SaveNamespaceContext {
private final FSNamesystem sourceNamesystem;
private final long txid;
private final List<StorageDirectory> errorSDs =
Collections.synchronizedList(new ArrayList<StorageDirectory>());
/**
* If the operation has been canceled, set to the reason why
* it has been canceled (eg standby moving to active)
*/
private volatile String cancelReason = null;
private CountDownLatch completionLatch = new CountDownLatch(1);
SaveNamespaceContext(
FSNamesystem sourceNamesystem,
long txid) {
this.sourceNamesystem = sourceNamesystem;
this.txid = txid;
}
FSNamesystem getSourceNamesystem() {
return sourceNamesystem;
}
long getTxId() {
return txid;
}
void reportErrorOnStorageDirectory(StorageDirectory sd) {
errorSDs.add(sd);
}
List<StorageDirectory> getErrorSDs() {
return errorSDs;
}
/**
* Requests that the current saveNamespace operation be
* canceled if it is still running.
* @param reason the reason why cancellation is requested
* @throws InterruptedException
*/
void cancel(String reason) throws InterruptedException {
this.cancelReason = reason;
completionLatch.await();
}
void markComplete() {
Preconditions.checkState(completionLatch.getCount() == 1,
"Context already completed!");
completionLatch.countDown();
}
void checkCancelled() throws SaveNamespaceCancelledException {
if (cancelReason != null) {
throw new SaveNamespaceCancelledException(
cancelReason);
}
}
boolean isCancelled() {
return cancelReason != null;
}
}

View File

@ -42,7 +42,7 @@ public abstract class MD5FileUtils {
private static final Log LOG = LogFactory.getLog(
MD5FileUtils.class);
private static final String MD5_SUFFIX = ".md5";
public static final String MD5_SUFFIX = ".md5";
private static final Pattern LINE_REGEX =
Pattern.compile("([0-9a-f]{32}) [ \\*](.+)");

View File

@ -29,6 +29,10 @@
import java.io.File;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -44,6 +48,9 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.log4j.Level;
import org.junit.Test;
import org.mockito.Mockito;
@ -124,22 +131,25 @@ private void saveNamespaceWithInjectedFault(Fault fault) throws Exception {
case SAVE_SECOND_FSIMAGE_RTE:
// The spy throws a RuntimeException when writing to the second directory
doAnswer(new FaultySaveImage(true)).
when(spyImage).saveFSImage(Mockito.eq(fsn),
(StorageDirectory)anyObject(), anyLong());
when(spyImage).saveFSImage(
(SaveNamespaceContext)anyObject(),
(StorageDirectory)anyObject());
shouldFail = false;
break;
case SAVE_SECOND_FSIMAGE_IOE:
// The spy throws an IOException when writing to the second directory
doAnswer(new FaultySaveImage(false)).
when(spyImage).saveFSImage(Mockito.eq(fsn),
(StorageDirectory)anyObject(), anyLong());
when(spyImage).saveFSImage(
(SaveNamespaceContext)anyObject(),
(StorageDirectory)anyObject());
shouldFail = false;
break;
case SAVE_ALL_FSIMAGES:
// The spy throws IOException in all directories
doThrow(new RuntimeException("Injected")).
when(spyImage).saveFSImage(Mockito.eq(fsn),
(StorageDirectory)anyObject(), anyLong());
when(spyImage).saveFSImage(
(SaveNamespaceContext)anyObject(),
(StorageDirectory)anyObject());
shouldFail = true;
break;
case WRITE_STORAGE_ALL:
@ -363,9 +373,9 @@ public void doTestFailedSaveNamespace(boolean restoreStorageAfterFailure)
FSNamesystem.getNamespaceEditsDirs(conf));
doThrow(new IOException("Injected fault: saveFSImage")).
when(spyImage).saveFSImage(
Mockito.eq(fsn), (StorageDirectory)anyObject(),
Mockito.anyLong());
when(spyImage).saveFSImage(
(SaveNamespaceContext)anyObject(),
(StorageDirectory)anyObject());
try {
doAnEdit(fsn, 1);
@ -479,6 +489,84 @@ public void testTxIdPersistence() throws Exception {
}
}
@Test(timeout=20000)
public void testCancelSaveNamespace() throws Exception {
Configuration conf = getConf();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
DFSTestUtil.formatNameNode(conf);
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
// Replace the FSImage with a spy
final FSImage image = fsn.dir.fsImage;
NNStorage storage = image.getStorage();
storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
storage.setStorageDirectories(
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
FSNamesystem spyFsn = spy(fsn);
final FSNamesystem finalFsn = spyFsn;
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
doAnswer(delayer).when(spyFsn).getGenerationStamp();
ExecutorService pool = Executors.newFixedThreadPool(2);
try {
doAnEdit(fsn, 1);
// Save namespace
fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
try {
Future<Void> saverFuture = pool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
image.saveNamespace(finalFsn);
return null;
}
});
// Wait until saveNamespace calls getGenerationStamp
delayer.waitForCall();
// then cancel the saveNamespace
Future<Void> cancelFuture = pool.submit(new Callable<Void>() {
public Void call() throws Exception {
image.cancelSaveNamespace("cancelled");
return null;
}
});
// give the cancel call time to run
Thread.sleep(500);
// allow saveNamespace to proceed - it should check the cancel flag after
// this point and throw an exception
delayer.proceed();
cancelFuture.get();
saverFuture.get();
fail("saveNamespace did not fail even though cancelled!");
} catch (Throwable t) {
GenericTestUtils.assertExceptionContains(
"SaveNamespaceCancelledException", t);
}
LOG.info("Successfully cancelled a saveNamespace");
// Check that we have only the original image and not any
// cruft left over from half-finished images
FSImageTestUtil.logStorageContents(LOG, storage);
for (StorageDirectory sd : storage.dirIterable(null)) {
File curDir = sd.getCurrentDir();
GenericTestUtils.assertGlobEquals(curDir, "fsimage_.*",
NNStorage.getImageFileName(0),
NNStorage.getImageFileName(0) + MD5FileUtils.MD5_SUFFIX);
}
} finally {
if (fsn != null) {
fsn.close();
}
}
}
private void doAnEdit(FSNamesystem fsn, int id) throws IOException {
// Make an edit
fsn.mkdirs(