diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3849696d33..c377e77106 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -824,6 +824,8 @@ Release 0.23.0 - Unreleased HDFS-1869. mkdirs should use the supplied permission for all of the created directories. (Daryn Sharp via szetszwo) + HDFS-2507. Allow saveNamespace operations to be canceled. (todd) + OPTIMIZATIONS HDFS-1458. Improve checkpoint performance by avoiding unnecessary image diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index a942941713..a6af3eb8e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -83,7 +83,9 @@ public class FSImage implements Closeable { final private Configuration conf; - private final NNStorageRetentionManager archivalManager; + private final NNStorageRetentionManager archivalManager; + + private SaveNamespaceContext curSaveNamespaceContext = null; /** @@ -715,14 +717,15 @@ private void loadFSImage(File curFile, MD5Hash expectedMd5, /** * Save the contents of the FS image to the file. */ - void saveFSImage(FSNamesystem source, StorageDirectory sd, long txid) + void saveFSImage(SaveNamespaceContext context, StorageDirectory sd) throws IOException { + long txid = context.getTxId(); File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid); File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid); - FSImageFormat.Saver saver = new FSImageFormat.Saver(); + FSImageFormat.Saver saver = new FSImageFormat.Saver(context); FSImageCompression compression = FSImageCompression.createCompression(conf); - saver.save(newFile, txid, source, compression); + saver.save(newFile, compression); MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest()); storage.setMostRecentCheckpointTxId(txid); @@ -740,25 +743,24 @@ void saveFSImage(FSNamesystem source, StorageDirectory sd, long txid) * and writing it out. */ private class FSImageSaver implements Runnable { + private final SaveNamespaceContext context; private StorageDirectory sd; - private List errorSDs; - private final long txid; - private final FSNamesystem source; - - FSImageSaver(FSNamesystem source, StorageDirectory sd, - List errorSDs, long txid) { - this.source = source; + + public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd) { + this.context = context; this.sd = sd; - this.errorSDs = errorSDs; - this.txid = txid; } - + public void run() { try { - saveFSImage(source, sd, txid); + saveFSImage(context, sd); + } catch (SaveNamespaceCancelledException snce) { + LOG.info("Cancelled image saving for " + sd.getRoot() + + ": " + snce.getMessage()); + // don't report an error on the storage dir! } catch (Throwable t) { LOG.error("Unable to save image for " + sd.getRoot(), t); - errorSDs.add(sd); + context.reportErrorOnStorageDirectory(sd); } } @@ -784,7 +786,7 @@ private void waitForThreads(List threads) { * Save the contents of the FS image to a new image file in each of the * current storage directories. */ - void saveNamespace(FSNamesystem source) throws IOException { + synchronized void saveNamespace(FSNamesystem source) throws IOException { assert editLog != null : "editLog must be initialized"; storage.attemptRestoreRemovedStorage(); @@ -800,46 +802,71 @@ void saveNamespace(FSNamesystem source) throws IOException { } finally { if (editLogWasOpen) { editLog.startLogSegment(imageTxId + 1, true); - // Take this opportunity to note the current transaction + // Take this opportunity to note the current transaction. + // Even if the namespace save was cancelled, this marker + // is only used to determine what transaction ID is required + // for startup. So, it doesn't hurt to update it unnecessarily. storage.writeTransactionIdFileToStorage(imageTxId + 1); } } } - protected void saveFSImageInAllDirs(FSNamesystem source, long txid) - throws IOException { + void cancelSaveNamespace(String reason) + throws InterruptedException { + SaveNamespaceContext ctx = curSaveNamespaceContext; + if (ctx != null) { + ctx.cancel(reason); // waits until complete + } + } + + + protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid) + throws IOException { if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) { throw new IOException("No image directories available!"); } - List errorSDs = - Collections.synchronizedList(new ArrayList()); - - List saveThreads = new ArrayList(); - // save images into current - for (Iterator it - = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) { - StorageDirectory sd = it.next(); - FSImageSaver saver = new FSImageSaver(source, sd, errorSDs, txid); - Thread saveThread = new Thread(saver, saver.toString()); - saveThreads.add(saveThread); - saveThread.start(); - } - waitForThreads(saveThreads); - saveThreads.clear(); - storage.reportErrorsOnDirectories(errorSDs); - - if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) { - throw new IOException( - "Failed to save in any storage directories while saving namespace."); - } - - renameCheckpoint(txid); + SaveNamespaceContext ctx = new SaveNamespaceContext( + source, txid); + curSaveNamespaceContext = ctx; - // Since we now have a new checkpoint, we can clean up some - // old edit logs and checkpoints. - purgeOldStorage(); + try { + List saveThreads = new ArrayList(); + // save images into current + for (Iterator it + = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) { + StorageDirectory sd = it.next(); + FSImageSaver saver = new FSImageSaver(ctx, sd); + Thread saveThread = new Thread(saver, saver.toString()); + saveThreads.add(saveThread); + saveThread.start(); + } + waitForThreads(saveThreads); + saveThreads.clear(); + storage.reportErrorsOnDirectories(ctx.getErrorSDs()); + + if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) { + throw new IOException( + "Failed to save in any storage directories while saving namespace."); + } + if (ctx.isCancelled()) { + deleteCancelledCheckpoint(txid); + ctx.checkCancelled(); // throws + assert false : "should have thrown above!"; + } + + renameCheckpoint(txid); + + // Since we now have a new checkpoint, we can clean up some + // old edit logs and checkpoints. + purgeOldStorage(); + } finally { + // Notify any threads waiting on the checkpoint to be canceled + // that it is complete. + ctx.markComplete(); + ctx = null; + } } /** @@ -873,6 +900,24 @@ private void renameCheckpoint(long txid) throws IOException { } if(al != null) storage.reportErrorsOnDirectories(al); } + + /** + * Deletes the checkpoint file in every storage directory, + * since the checkpoint was cancelled. + */ + private void deleteCancelledCheckpoint(long txid) throws IOException { + ArrayList al = Lists.newArrayList(); + + for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) { + File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid); + if (ckpt.exists() && !ckpt.delete()) { + LOG.warn("Unable to delete cancelled checkpoint in " + sd); + al.add(sd); + } + } + storage.reportErrorsOnDirectories(al); + } + private void renameCheckpointInDir(StorageDirectory sd, long txid) throws IOException { @@ -1055,4 +1100,5 @@ public String getBlockPoolID() { public synchronized long getLastAppliedTxId() { return lastAppliedTxId; } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index c178e048b5..e029b24022 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -508,6 +508,7 @@ byte[][] getParent(byte[][] path) { * functions may be used to retrieve information about the file that was written. */ static class Saver { + private final SaveNamespaceContext context; /** Set to true once an image has been written */ private boolean saved = false; @@ -529,6 +530,11 @@ private void checkNotSaved() { throw new IllegalStateException("FSImageSaver has already saved an image"); } } + + + Saver(SaveNamespaceContext context) { + this.context = context; + } /** * Return the MD5 checksum of the image file that was saved. @@ -539,12 +545,11 @@ MD5Hash getSavedDigest() { } void save(File newFile, - long txid, - FSNamesystem sourceNamesystem, FSImageCompression compression) throws IOException { checkNotSaved(); + final FSNamesystem sourceNamesystem = context.getSourceNamesystem(); FSDirectory fsDir = sourceNamesystem.dir; long startTime = now(); // @@ -565,7 +570,7 @@ void save(File newFile, .getNamespaceID()); out.writeLong(fsDir.rootDir.numItemsInTree()); out.writeLong(sourceNamesystem.getGenerationStamp()); - out.writeLong(txid); + out.writeLong(context.getTxId()); // write compression info and set up compressed stream out = compression.writeHeaderAndWrapStream(fos); @@ -581,10 +586,12 @@ void save(File newFile, saveImage(strbuf, fsDir.rootDir, out); // save files under construction sourceNamesystem.saveFilesUnderConstruction(out); + context.checkCancelled(); sourceNamesystem.saveSecretManagerState(out); strbuf = null; - + context.checkCancelled(); out.flush(); + context.checkCancelled(); fout.getChannel().force(true); } finally { out.close(); @@ -603,9 +610,10 @@ void save(File newFile, * This is a recursive procedure, which first saves all children of * a current directory and then moves inside the sub-directories. */ - private static void saveImage(ByteBuffer currentDirName, + private void saveImage(ByteBuffer currentDirName, INodeDirectory current, DataOutputStream out) throws IOException { + context.checkCancelled(); List children = current.getChildrenRaw(); if (children == null || children.isEmpty()) return; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 4851796cea..f3383c0e37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2728,6 +2728,27 @@ void saveNamespace() throws AccessControlException, IOException { } } + /** + * Cancel an ongoing saveNamespace operation and wait for its + * threads to exit, if one is currently in progress. + * + * If no such operation is in progress, this call does nothing. + * + * @param reason a reason to be communicated to the caller saveNamespace + * @throws IOException + */ + void cancelSaveNamespace(String reason) throws IOException { + readLock(); + try { + checkSuperuserPrivilege(); + getFSImage().cancelSaveNamespace(reason); + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + readUnlock(); + } + } + /** * Enables/Disables/Checks restoring failed storage replicas if the storage becomes available again. * Requires superuser privilege. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java new file mode 100644 index 0000000000..2731275f26 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; + +class SaveNamespaceCancelledException extends IOException { + private static final long serialVersionUID = 1L; + + SaveNamespaceCancelledException(String cancelReason) { + super(cancelReason); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java new file mode 100644 index 0000000000..c5c0c06d0e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; + +import com.google.common.base.Preconditions; + +/** + * Context for an ongoing SaveNamespace operation. This class + * allows cancellation, and also is responsible for accumulating + * failed storage directories. + */ +class SaveNamespaceContext { + private final FSNamesystem sourceNamesystem; + private final long txid; + private final List errorSDs = + Collections.synchronizedList(new ArrayList()); + + /** + * If the operation has been canceled, set to the reason why + * it has been canceled (eg standby moving to active) + */ + private volatile String cancelReason = null; + + private CountDownLatch completionLatch = new CountDownLatch(1); + + SaveNamespaceContext( + FSNamesystem sourceNamesystem, + long txid) { + this.sourceNamesystem = sourceNamesystem; + this.txid = txid; + } + + FSNamesystem getSourceNamesystem() { + return sourceNamesystem; + } + + long getTxId() { + return txid; + } + + void reportErrorOnStorageDirectory(StorageDirectory sd) { + errorSDs.add(sd); + } + + List getErrorSDs() { + return errorSDs; + } + + /** + * Requests that the current saveNamespace operation be + * canceled if it is still running. + * @param reason the reason why cancellation is requested + * @throws InterruptedException + */ + void cancel(String reason) throws InterruptedException { + this.cancelReason = reason; + completionLatch.await(); + } + + void markComplete() { + Preconditions.checkState(completionLatch.getCount() == 1, + "Context already completed!"); + completionLatch.countDown(); + } + + void checkCancelled() throws SaveNamespaceCancelledException { + if (cancelReason != null) { + throw new SaveNamespaceCancelledException( + cancelReason); + } + } + + boolean isCancelled() { + return cancelReason != null; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java index 0d691378ba..c010e2730a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java @@ -42,7 +42,7 @@ public abstract class MD5FileUtils { private static final Log LOG = LogFactory.getLog( MD5FileUtils.class); - private static final String MD5_SUFFIX = ".md5"; + public static final String MD5_SUFFIX = ".md5"; private static final Pattern LINE_REGEX = Pattern.compile("([0-9a-f]{32}) [ \\*](.+)"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java index 9365c6ef04..13e256d78c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java @@ -29,6 +29,10 @@ import java.io.File; import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +48,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.util.MD5FileUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; import org.apache.log4j.Level; import org.junit.Test; import org.mockito.Mockito; @@ -124,22 +131,25 @@ private void saveNamespaceWithInjectedFault(Fault fault) throws Exception { case SAVE_SECOND_FSIMAGE_RTE: // The spy throws a RuntimeException when writing to the second directory doAnswer(new FaultySaveImage(true)). - when(spyImage).saveFSImage(Mockito.eq(fsn), - (StorageDirectory)anyObject(), anyLong()); + when(spyImage).saveFSImage( + (SaveNamespaceContext)anyObject(), + (StorageDirectory)anyObject()); shouldFail = false; break; case SAVE_SECOND_FSIMAGE_IOE: // The spy throws an IOException when writing to the second directory doAnswer(new FaultySaveImage(false)). - when(spyImage).saveFSImage(Mockito.eq(fsn), - (StorageDirectory)anyObject(), anyLong()); + when(spyImage).saveFSImage( + (SaveNamespaceContext)anyObject(), + (StorageDirectory)anyObject()); shouldFail = false; break; case SAVE_ALL_FSIMAGES: // The spy throws IOException in all directories doThrow(new RuntimeException("Injected")). - when(spyImage).saveFSImage(Mockito.eq(fsn), - (StorageDirectory)anyObject(), anyLong()); + when(spyImage).saveFSImage( + (SaveNamespaceContext)anyObject(), + (StorageDirectory)anyObject()); shouldFail = true; break; case WRITE_STORAGE_ALL: @@ -363,9 +373,9 @@ public void doTestFailedSaveNamespace(boolean restoreStorageAfterFailure) FSNamesystem.getNamespaceEditsDirs(conf)); doThrow(new IOException("Injected fault: saveFSImage")). - when(spyImage).saveFSImage( - Mockito.eq(fsn), (StorageDirectory)anyObject(), - Mockito.anyLong()); + when(spyImage).saveFSImage( + (SaveNamespaceContext)anyObject(), + (StorageDirectory)anyObject()); try { doAnEdit(fsn, 1); @@ -479,6 +489,84 @@ public void testTxIdPersistence() throws Exception { } } + @Test(timeout=20000) + public void testCancelSaveNamespace() throws Exception { + Configuration conf = getConf(); + NameNode.initMetrics(conf, NamenodeRole.NAMENODE); + DFSTestUtil.formatNameNode(conf); + FSNamesystem fsn = FSNamesystem.loadFromDisk(conf); + + // Replace the FSImage with a spy + final FSImage image = fsn.dir.fsImage; + NNStorage storage = image.getStorage(); + storage.close(); // unlock any directories that FSNamesystem's initialization may have locked + storage.setStorageDirectories( + FSNamesystem.getNamespaceDirs(conf), + FSNamesystem.getNamespaceEditsDirs(conf)); + + FSNamesystem spyFsn = spy(fsn); + final FSNamesystem finalFsn = spyFsn; + DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG); + doAnswer(delayer).when(spyFsn).getGenerationStamp(); + + ExecutorService pool = Executors.newFixedThreadPool(2); + + try { + doAnEdit(fsn, 1); + + // Save namespace + fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + try { + Future saverFuture = pool.submit(new Callable() { + @Override + public Void call() throws Exception { + image.saveNamespace(finalFsn); + return null; + } + }); + + // Wait until saveNamespace calls getGenerationStamp + delayer.waitForCall(); + // then cancel the saveNamespace + Future cancelFuture = pool.submit(new Callable() { + public Void call() throws Exception { + image.cancelSaveNamespace("cancelled"); + return null; + } + }); + // give the cancel call time to run + Thread.sleep(500); + + // allow saveNamespace to proceed - it should check the cancel flag after + // this point and throw an exception + delayer.proceed(); + + cancelFuture.get(); + saverFuture.get(); + fail("saveNamespace did not fail even though cancelled!"); + } catch (Throwable t) { + GenericTestUtils.assertExceptionContains( + "SaveNamespaceCancelledException", t); + } + LOG.info("Successfully cancelled a saveNamespace"); + + + // Check that we have only the original image and not any + // cruft left over from half-finished images + FSImageTestUtil.logStorageContents(LOG, storage); + for (StorageDirectory sd : storage.dirIterable(null)) { + File curDir = sd.getCurrentDir(); + GenericTestUtils.assertGlobEquals(curDir, "fsimage_.*", + NNStorage.getImageFileName(0), + NNStorage.getImageFileName(0) + MD5FileUtils.MD5_SUFFIX); + } + } finally { + if (fsn != null) { + fsn.close(); + } + } + } + private void doAnEdit(FSNamesystem fsn, int id) throws IOException { // Make an edit fsn.mkdirs(