From ebb6cc60c421286d9121ad907e6d68fa4ee3cf93 Mon Sep 17 00:00:00 2001
From: Todd Lipcon <todd@apache.org>
Date: Thu, 27 Oct 2011 22:10:58 +0000
Subject: [PATCH] HDFS-2507. Allow saveNamespace operations to be canceled.
 Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1190059 13f79535-47bb-0310-9956-ffa450edef68
---
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt   |   2 +
 .../hadoop/hdfs/server/namenode/FSImage.java  | 138 ++++++++++++------
 .../hdfs/server/namenode/FSImageFormat.java   |  18 ++-
 .../hdfs/server/namenode/FSNamesystem.java    |  21 +++
 .../SaveNamespaceCancelledException.java      |  28 ++++
 .../server/namenode/SaveNamespaceContext.java |  98 +++++++++++++
 .../apache/hadoop/hdfs/util/MD5FileUtils.java |   2 +-
 .../server/namenode/TestSaveNamespace.java    | 106 ++++++++++++--
 8 files changed, 352 insertions(+), 61 deletions(-)
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java

diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b9bc4a22ab..a239cc99e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -788,6 +788,8 @@ Release 0.23.0 - Unreleased
     HDFS-2363. Move datanodes size printing from FSNamesystem.metasave(..)
     to BlockManager.  (Uma Maheswara Rao G via szetszwo)
 
+    HDFS-2507. Allow saveNamespace operations to be canceled. (todd)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index a942941713..a6af3eb8e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -83,7 +83,9 @@ public class FSImage implements Closeable {
 
   final private Configuration conf;
 
-  private final NNStorageRetentionManager archivalManager; 
+  private final NNStorageRetentionManager archivalManager;
+
+  private SaveNamespaceContext curSaveNamespaceContext = null; 
 
 
   /**
@@ -715,14 +717,15 @@ public class FSImage implements Closeable {
   /**
    * Save the contents of the FS image to the file.
    */
-  void saveFSImage(FSNamesystem source, StorageDirectory sd, long txid)
+  void saveFSImage(SaveNamespaceContext context, StorageDirectory sd)
       throws IOException {
+    long txid = context.getTxId();
     File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
     File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
     
-    FSImageFormat.Saver saver = new FSImageFormat.Saver();
+    FSImageFormat.Saver saver = new FSImageFormat.Saver(context);
     FSImageCompression compression = FSImageCompression.createCompression(conf);
-    saver.save(newFile, txid, source, compression);
+    saver.save(newFile, compression);
     
     MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
     storage.setMostRecentCheckpointTxId(txid);
@@ -740,25 +743,24 @@ public class FSImage implements Closeable {
    * and writing it out.
    */
   private class FSImageSaver implements Runnable {
+    private final SaveNamespaceContext context;
     private StorageDirectory sd;
-    private List<StorageDirectory> errorSDs;
-    private final long txid;
-    private final FSNamesystem source;
-    
-    FSImageSaver(FSNamesystem source, StorageDirectory sd,
-        List<StorageDirectory> errorSDs, long txid) {
-      this.source = source;
+
+    public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd) {
+      this.context = context;
       this.sd = sd;
-      this.errorSDs = errorSDs;
-      this.txid = txid;
     }
-    
+
     public void run() {
       try {
-        saveFSImage(source, sd, txid);
+        saveFSImage(context, sd);
+      } catch (SaveNamespaceCancelledException snce) {
+        LOG.info("Cancelled image saving for " + sd.getRoot() +
+            ": " + snce.getMessage());
+        // don't report an error on the storage dir!
       } catch (Throwable t) {
         LOG.error("Unable to save image for " + sd.getRoot(), t);
-        errorSDs.add(sd);
+        context.reportErrorOnStorageDirectory(sd);
       }
     }
     
@@ -784,7 +786,7 @@ public class FSImage implements Closeable {
    * Save the contents of the FS image to a new image file in each of the
    * current storage directories.
    */
-  void saveNamespace(FSNamesystem source) throws IOException {
+  synchronized void saveNamespace(FSNamesystem source) throws IOException {
     assert editLog != null : "editLog must be initialized";
     storage.attemptRestoreRemovedStorage();
 
@@ -800,46 +802,71 @@ public class FSImage implements Closeable {
     } finally {
       if (editLogWasOpen) {
         editLog.startLogSegment(imageTxId + 1, true);
-        // Take this opportunity to note the current transaction
+        // Take this opportunity to note the current transaction.
+        // Even if the namespace save was cancelled, this marker
+        // is only used to determine what transaction ID is required
+        // for startup. So, it doesn't hurt to update it unnecessarily.
         storage.writeTransactionIdFileToStorage(imageTxId + 1);
       }
     }
     
   }
   
-  protected void saveFSImageInAllDirs(FSNamesystem source, long txid)
-      throws IOException {
+  void cancelSaveNamespace(String reason)
+      throws InterruptedException {
+    SaveNamespaceContext ctx = curSaveNamespaceContext;
+    if (ctx != null) {
+      ctx.cancel(reason); // waits until complete
+    }
+  }
+
+  
+  protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
+      throws IOException {    
     if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
       throw new IOException("No image directories available!");
     }
     
-    List<StorageDirectory> errorSDs =
-      Collections.synchronizedList(new ArrayList<StorageDirectory>());
-
-    List<Thread> saveThreads = new ArrayList<Thread>();
-    // save images into current
-    for (Iterator<StorageDirectory> it
-           = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      FSImageSaver saver = new FSImageSaver(source, sd, errorSDs, txid);
-      Thread saveThread = new Thread(saver, saver.toString());
-      saveThreads.add(saveThread);
-      saveThread.start();
-    }
-    waitForThreads(saveThreads);
-    saveThreads.clear();
-    storage.reportErrorsOnDirectories(errorSDs);
-
-    if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
-      throw new IOException(
-        "Failed to save in any storage directories while saving namespace.");
-    }
-
-    renameCheckpoint(txid);
+    SaveNamespaceContext ctx = new SaveNamespaceContext(
+        source, txid);
+    curSaveNamespaceContext = ctx;
     
-    // Since we now have a new checkpoint, we can clean up some
-    // old edit logs and checkpoints.
-    purgeOldStorage();
+    try {
+      List<Thread> saveThreads = new ArrayList<Thread>();
+      // save images into current
+      for (Iterator<StorageDirectory> it
+             = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+        StorageDirectory sd = it.next();
+        FSImageSaver saver = new FSImageSaver(ctx, sd);
+        Thread saveThread = new Thread(saver, saver.toString());
+        saveThreads.add(saveThread);
+        saveThread.start();
+      }
+      waitForThreads(saveThreads);
+      saveThreads.clear();
+      storage.reportErrorsOnDirectories(ctx.getErrorSDs());
+  
+      if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
+        throw new IOException(
+          "Failed to save in any storage directories while saving namespace.");
+      }
+      if (ctx.isCancelled()) {
+        deleteCancelledCheckpoint(txid);
+        ctx.checkCancelled(); // throws
+        assert false : "should have thrown above!";
+      }
+  
+      renameCheckpoint(txid);
+  
+      // Since we now have a new checkpoint, we can clean up some
+      // old edit logs and checkpoints.
+      purgeOldStorage();
+    } finally {
+      // Notify any threads waiting on the checkpoint to be canceled
+      // that it is complete.
+      ctx.markComplete();
+      ctx = null;
+    }
   }
 
   /**
@@ -873,6 +900,24 @@ public class FSImage implements Closeable {
     }
     if(al != null) storage.reportErrorsOnDirectories(al);
   }
+  
+  /**
+   * Deletes the checkpoint file in every storage directory,
+   * since the checkpoint was cancelled.
+   */
+  private void deleteCancelledCheckpoint(long txid) throws IOException {
+    ArrayList<StorageDirectory> al = Lists.newArrayList();
+
+    for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) {
+      File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
+      if (ckpt.exists() && !ckpt.delete()) {
+        LOG.warn("Unable to delete cancelled checkpoint in " + sd);
+        al.add(sd);            
+      }
+    }
+    storage.reportErrorsOnDirectories(al);
+  }
+
 
   private void renameCheckpointInDir(StorageDirectory sd, long txid)
       throws IOException {
@@ -1055,4 +1100,5 @@ public class FSImage implements Closeable {
   public synchronized long getLastAppliedTxId() {
     return lastAppliedTxId;
   }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index c178e048b5..e029b24022 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -508,6 +508,7 @@ class FSImageFormat {
    * functions may be used to retrieve information about the file that was written.
    */
   static class Saver {
+    private final SaveNamespaceContext context;
     /** Set to true once an image has been written */
     private boolean saved = false;
     
@@ -529,6 +530,11 @@ class FSImageFormat {
         throw new IllegalStateException("FSImageSaver has already saved an image");
       }
     }
+    
+
+    Saver(SaveNamespaceContext context) {
+      this.context = context;
+    }
 
     /**
      * Return the MD5 checksum of the image file that was saved.
@@ -539,12 +545,11 @@ class FSImageFormat {
     }
 
     void save(File newFile,
-              long txid,
-              FSNamesystem sourceNamesystem,
               FSImageCompression compression)
       throws IOException {
       checkNotSaved();
 
+      final FSNamesystem sourceNamesystem = context.getSourceNamesystem();
       FSDirectory fsDir = sourceNamesystem.dir;
       long startTime = now();
       //
@@ -565,7 +570,7 @@ class FSImageFormat {
             .getNamespaceID());
         out.writeLong(fsDir.rootDir.numItemsInTree());
         out.writeLong(sourceNamesystem.getGenerationStamp());
-        out.writeLong(txid);
+        out.writeLong(context.getTxId());
 
         // write compression info and set up compressed stream
         out = compression.writeHeaderAndWrapStream(fos);
@@ -581,10 +586,12 @@ class FSImageFormat {
         saveImage(strbuf, fsDir.rootDir, out);
         // save files under construction
         sourceNamesystem.saveFilesUnderConstruction(out);
+        context.checkCancelled();
         sourceNamesystem.saveSecretManagerState(out);
         strbuf = null;
-
+        context.checkCancelled();
         out.flush();
+        context.checkCancelled();
         fout.getChannel().force(true);
       } finally {
         out.close();
@@ -603,9 +610,10 @@ class FSImageFormat {
      * This is a recursive procedure, which first saves all children of
      * a current directory and then moves inside the sub-directories.
      */
-    private static void saveImage(ByteBuffer currentDirName,
+    private void saveImage(ByteBuffer currentDirName,
                                   INodeDirectory current,
                                   DataOutputStream out) throws IOException {
+      context.checkCancelled();
       List<INode> children = current.getChildrenRaw();
       if (children == null || children.isEmpty())
         return;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 29e76f7db0..ff590c1528 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -2785,6 +2785,27 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
   }
   
+  /**
+   * Cancel an ongoing saveNamespace operation and wait for its
+   * threads to exit, if one is currently in progress.
+   *
+   * If no such operation is in progress, this call does nothing.
+   *
+   * @param reason a reason to be communicated to the caller saveNamespace 
+   * @throws IOException
+   */
+  void cancelSaveNamespace(String reason) throws IOException {
+    readLock();
+    try {
+      checkSuperuserPrivilege();
+      getFSImage().cancelSaveNamespace(reason);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } finally {
+      readUnlock();
+    }
+  }
+  
   /**
    * Enables/Disables/Checks restoring failed storage replicas if the storage becomes available again.
    * Requires superuser privilege.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java
new file mode 100644
index 0000000000..2731275f26
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+class SaveNamespaceCancelledException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  SaveNamespaceCancelledException(String cancelReason) {
+    super(cancelReason);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java
new file mode 100644
index 0000000000..c5c0c06d0e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Context for an ongoing SaveNamespace operation. This class
+ * allows cancellation, and also is responsible for accumulating
+ * failed storage directories.
+ */
+class SaveNamespaceContext {
+  private final FSNamesystem sourceNamesystem;
+  private final long txid;
+  private final List<StorageDirectory> errorSDs =
+    Collections.synchronizedList(new ArrayList<StorageDirectory>());
+
+  /**
+   * If the operation has been canceled, set to the reason why
+   * it has been canceled (eg standby moving to active)
+   */
+  private volatile String cancelReason = null;
+  
+  private CountDownLatch completionLatch = new CountDownLatch(1);
+  
+  SaveNamespaceContext(
+      FSNamesystem sourceNamesystem,
+      long txid) {
+    this.sourceNamesystem = sourceNamesystem;
+    this.txid = txid;
+  }
+
+  FSNamesystem getSourceNamesystem() {
+    return sourceNamesystem;
+  }
+
+  long getTxId() {
+    return txid;
+  }
+
+  void reportErrorOnStorageDirectory(StorageDirectory sd) {
+    errorSDs.add(sd);
+  }
+
+  List<StorageDirectory> getErrorSDs() {
+    return errorSDs;
+  }
+
+  /**
+   * Requests that the current saveNamespace operation be
+   * canceled if it is still running.
+   * @param reason the reason why cancellation is requested
+   * @throws InterruptedException 
+   */
+  void cancel(String reason) throws InterruptedException {
+    this.cancelReason = reason;
+    completionLatch.await();
+  }
+  
+  void markComplete() {
+    Preconditions.checkState(completionLatch.getCount() == 1,
+        "Context already completed!");
+    completionLatch.countDown();
+  }
+
+  void checkCancelled() throws SaveNamespaceCancelledException {
+    if (cancelReason != null) {
+      throw new SaveNamespaceCancelledException(
+          cancelReason);
+    }
+  }
+
+  boolean isCancelled() {
+    return cancelReason != null;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java
index 0d691378ba..c010e2730a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java
@@ -42,7 +42,7 @@ public abstract class MD5FileUtils {
   private static final Log LOG = LogFactory.getLog(
       MD5FileUtils.class);
 
-  private static final String MD5_SUFFIX = ".md5";
+  public static final String MD5_SUFFIX = ".md5";
   private static final Pattern LINE_REGEX =
     Pattern.compile("([0-9a-f]{32}) [ \\*](.+)");
   
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
index 9365c6ef04..13e256d78c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
@@ -29,6 +29,10 @@ import static org.mockito.Mockito.spy;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +48,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
 import org.apache.log4j.Level;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -124,22 +131,25 @@ public class TestSaveNamespace {
     case SAVE_SECOND_FSIMAGE_RTE:
       // The spy throws a RuntimeException when writing to the second directory
       doAnswer(new FaultySaveImage(true)).
-        when(spyImage).saveFSImage(Mockito.eq(fsn),
-            (StorageDirectory)anyObject(), anyLong());
+        when(spyImage).saveFSImage(
+            (SaveNamespaceContext)anyObject(),
+            (StorageDirectory)anyObject());
       shouldFail = false;
       break;
     case SAVE_SECOND_FSIMAGE_IOE:
       // The spy throws an IOException when writing to the second directory
       doAnswer(new FaultySaveImage(false)).
-        when(spyImage).saveFSImage(Mockito.eq(fsn),
-            (StorageDirectory)anyObject(), anyLong());
+        when(spyImage).saveFSImage(
+            (SaveNamespaceContext)anyObject(),
+            (StorageDirectory)anyObject());
       shouldFail = false;
       break;
     case SAVE_ALL_FSIMAGES:
       // The spy throws IOException in all directories
       doThrow(new RuntimeException("Injected")).
-        when(spyImage).saveFSImage(Mockito.eq(fsn),
-            (StorageDirectory)anyObject(), anyLong());
+      when(spyImage).saveFSImage(
+          (SaveNamespaceContext)anyObject(),
+          (StorageDirectory)anyObject());
       shouldFail = true;
       break;
     case WRITE_STORAGE_ALL:
@@ -363,9 +373,9 @@ public class TestSaveNamespace {
         FSNamesystem.getNamespaceEditsDirs(conf));
 
     doThrow(new IOException("Injected fault: saveFSImage")).
-      when(spyImage).saveFSImage(
-          Mockito.eq(fsn), (StorageDirectory)anyObject(),
-          Mockito.anyLong());
+        when(spyImage).saveFSImage(
+            (SaveNamespaceContext)anyObject(),
+            (StorageDirectory)anyObject());
 
     try {
       doAnEdit(fsn, 1);
@@ -479,6 +489,84 @@ public class TestSaveNamespace {
     }
   }
   
+  @Test(timeout=20000)
+  public void testCancelSaveNamespace() throws Exception {
+    Configuration conf = getConf();
+    NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
+    DFSTestUtil.formatNameNode(conf);
+    FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
+
+    // Replace the FSImage with a spy
+    final FSImage image = fsn.dir.fsImage;
+    NNStorage storage = image.getStorage();
+    storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
+    storage.setStorageDirectories(
+        FSNamesystem.getNamespaceDirs(conf), 
+        FSNamesystem.getNamespaceEditsDirs(conf));
+
+    FSNamesystem spyFsn = spy(fsn);
+    final FSNamesystem finalFsn = spyFsn;
+    DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
+    doAnswer(delayer).when(spyFsn).getGenerationStamp();
+    
+    ExecutorService pool = Executors.newFixedThreadPool(2);
+    
+    try {
+      doAnEdit(fsn, 1);
+
+      // Save namespace
+      fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      try {
+        Future<Void> saverFuture = pool.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            image.saveNamespace(finalFsn);
+            return null;
+          }
+        });
+
+        // Wait until saveNamespace calls getGenerationStamp
+        delayer.waitForCall();
+        // then cancel the saveNamespace
+        Future<Void> cancelFuture = pool.submit(new Callable<Void>() {
+          public Void call() throws Exception {
+            image.cancelSaveNamespace("cancelled");
+            return null;
+          }
+        });
+        // give the cancel call time to run
+        Thread.sleep(500);
+        
+        // allow saveNamespace to proceed - it should check the cancel flag after
+        // this point and throw an exception
+        delayer.proceed();
+
+        cancelFuture.get();
+        saverFuture.get();
+        fail("saveNamespace did not fail even though cancelled!");
+      } catch (Throwable t) {
+        GenericTestUtils.assertExceptionContains(
+            "SaveNamespaceCancelledException", t);
+      }
+      LOG.info("Successfully cancelled a saveNamespace");
+
+
+      // Check that we have only the original image and not any
+      // cruft left over from half-finished images
+      FSImageTestUtil.logStorageContents(LOG, storage);
+      for (StorageDirectory sd : storage.dirIterable(null)) {
+        File curDir = sd.getCurrentDir();
+        GenericTestUtils.assertGlobEquals(curDir, "fsimage_.*",
+            NNStorage.getImageFileName(0),
+            NNStorage.getImageFileName(0) + MD5FileUtils.MD5_SUFFIX);
+      }      
+    } finally {
+      if (fsn != null) {
+        fsn.close();
+      }
+    }
+  }
+  
   private void doAnEdit(FSNamesystem fsn, int id) throws IOException {
     // Make an edit
     fsn.mkdirs(