diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 15585f3695..85b50fa505 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -217,6 +217,9 @@ Release 2.0.1-alpha - UNRELEASED HDFS-3434. InvalidProtocolBufferException when visiting DN browseDirectory.jsp (eli) + HDFS-2800. Fix cancellation of checkpoints in the standby node to be more + reliable. (todd) + Release 2.0.0-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index c5d419565d..1b81f942f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -89,9 +90,6 @@ public class FSImage implements Closeable { private final NNStorageRetentionManager archivalManager; - private SaveNamespaceContext curSaveNamespaceContext = null; - - /** * Construct an FSImage * @param conf Configuration @@ -804,17 +802,28 @@ private void waitForThreads(List threads) { try { thread.join(); } catch (InterruptedException iex) { - LOG.error("Caught exception while waiting for thread " + + LOG.error("Caught interrupted exception while waiting for thread " + thread.getName() + " to finish. Retrying join"); } } } } + + /** + * @see #saveNamespace(FSNamesystem, Canceler) + */ + public synchronized void saveNamespace(FSNamesystem source) + throws IOException { + saveNamespace(source, null); + } + /** * Save the contents of the FS image to a new image file in each of the * current storage directories. + * @param canceler */ - public synchronized void saveNamespace(FSNamesystem source) throws IOException { + public synchronized void saveNamespace(FSNamesystem source, + Canceler canceler) throws IOException { assert editLog != null : "editLog must be initialized"; storage.attemptRestoreRemovedStorage(); @@ -825,7 +834,7 @@ public synchronized void saveNamespace(FSNamesystem source) throws IOException { } long imageTxId = getLastAppliedOrWrittenTxId(); try { - saveFSImageInAllDirs(source, imageTxId); + saveFSImageInAllDirs(source, imageTxId, canceler); storage.writeAll(); } finally { if (editLogWasOpen) { @@ -837,27 +846,27 @@ public synchronized void saveNamespace(FSNamesystem source) throws IOException { storage.writeTransactionIdFileToStorage(imageTxId + 1); } } - - } - - public void cancelSaveNamespace(String reason) - throws InterruptedException { - SaveNamespaceContext ctx = curSaveNamespaceContext; - if (ctx != null) { - ctx.cancel(reason); // waits until complete - } } - + /** + * @see #saveFSImageInAllDirs(FSNamesystem, long, Canceler) + */ protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid) + throws IOException { + saveFSImageInAllDirs(source, txid, null); + } + + protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid, + Canceler canceler) throws IOException { if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) { throw new IOException("No image directories available!"); } - + if (canceler == null) { + canceler = new Canceler(); + } SaveNamespaceContext ctx = new SaveNamespaceContext( - source, txid); - curSaveNamespaceContext = ctx; + source, txid, canceler); try { List saveThreads = new ArrayList(); @@ -878,7 +887,7 @@ protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid) throw new IOException( "Failed to save in any storage directories while saving namespace."); } - if (ctx.isCancelled()) { + if (canceler.isCancelled()) { deleteCancelledCheckpoint(txid); ctx.checkCancelled(); // throws assert false : "should have thrown above!"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index f666f35b74..a65f20e72f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -540,7 +540,6 @@ void save(File newFile, private void saveImage(ByteBuffer currentDirName, INodeDirectory current, DataOutputStream out) throws IOException { - context.checkCancelled(); List children = current.getChildrenRaw(); if (children == null || children.isEmpty()) return; @@ -554,9 +553,13 @@ private void saveImage(ByteBuffer currentDirName, out.write(currentDirName.array(), 0, prefixLen); } out.writeInt(children.size()); + int i = 0; for(INode child : children) { // print all children first FSImageSerialization.saveINode2Image(child, out); + if (i++ % 50 == 0) { + context.checkCancelled(); + } } for(INode child : children) { if(!child.isDirectory()) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 5d0070b410..e198b86814 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -702,7 +702,8 @@ void startStandbyServices(final Configuration conf) { */ void prepareToStopStandbyServices() throws ServiceFailedException { if (standbyCheckpointer != null) { - standbyCheckpointer.cancelAndPreventCheckpoints(); + standbyCheckpointer.cancelAndPreventCheckpoints( + "About to leave standby state"); } } @@ -3372,27 +3373,6 @@ void saveNamespace() throws AccessControlException, IOException { } } - /** - * Cancel an ongoing saveNamespace operation and wait for its - * threads to exit, if one is currently in progress. - * - * If no such operation is in progress, this call does nothing. - * - * @param reason a reason to be communicated to the caller saveNamespace - * @throws IOException - */ - void cancelSaveNamespace(String reason) throws IOException { - readLock(); - try { - checkSuperuserPrivilege(); - getFSImage().cancelSaveNamespace(reason); - } catch (InterruptedException e) { - throw new IOException(e); - } finally { - readUnlock(); - } - } - /** * Enables/Disables/Checks restoring failed storage replicas if the storage becomes available again. * Requires superuser privilege. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java index c5c0c06d0e..67ee88e11d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java @@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.util.Canceler; import com.google.common.base.Preconditions; @@ -36,20 +37,17 @@ class SaveNamespaceContext { private final long txid; private final List errorSDs = Collections.synchronizedList(new ArrayList()); - - /** - * If the operation has been canceled, set to the reason why - * it has been canceled (eg standby moving to active) - */ - private volatile String cancelReason = null; + private final Canceler canceller; private CountDownLatch completionLatch = new CountDownLatch(1); - + SaveNamespaceContext( FSNamesystem sourceNamesystem, - long txid) { + long txid, + Canceler canceller) { this.sourceNamesystem = sourceNamesystem; this.txid = txid; + this.canceller = canceller; } FSNamesystem getSourceNamesystem() { @@ -68,17 +66,6 @@ List getErrorSDs() { return errorSDs; } - /** - * Requests that the current saveNamespace operation be - * canceled if it is still running. - * @param reason the reason why cancellation is requested - * @throws InterruptedException - */ - void cancel(String reason) throws InterruptedException { - this.cancelReason = reason; - completionLatch.await(); - } - void markComplete() { Preconditions.checkState(completionLatch.getCount() == 1, "Context already completed!"); @@ -86,13 +73,9 @@ void markComplete() { } void checkCancelled() throws SaveNamespaceCancelledException { - if (cancelReason != null) { + if (canceller.isCancelled()) { throw new SaveNamespaceCancelledException( - cancelReason); + canceller.getCancellationReason()); } } - - boolean isCancelled() { - return cancelReason != null; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java index a648afa96e..d575dd2316 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; +import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -58,6 +59,9 @@ public class StandbyCheckpointer { private final CheckpointerThread thread; private String activeNNAddress; private InetSocketAddress myNNAddress; + + private Object cancelLock = new Object(); + private Canceler canceler; // Keep track of how many checkpoints were canceled. // This is for use in tests. @@ -123,6 +127,7 @@ public void start() { } public void stop() throws IOException { + cancelAndPreventCheckpoints("Stopping checkpointer"); thread.setShouldRun(false); thread.interrupt(); try { @@ -134,6 +139,7 @@ public void stop() throws IOException { } private void doCheckpoint() throws InterruptedException, IOException { + assert canceler != null; long txid; namesystem.writeLockInterruptibly(); @@ -153,8 +159,8 @@ private void doCheckpoint() throws InterruptedException, IOException { thisCheckpointTxId + ". Skipping..."); return; } - - img.saveNamespace(namesystem); + + img.saveNamespace(namesystem, canceler); txid = img.getStorage().getMostRecentCheckpointTxId(); assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" + thisCheckpointTxId + " but instead saved at txid=" + txid; @@ -173,16 +179,18 @@ private void doCheckpoint() throws InterruptedException, IOException { * and prevent any new checkpoints from starting for the next * minute or so. */ - public void cancelAndPreventCheckpoints() throws ServiceFailedException { - try { - thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS); - // TODO(HA): there is a really narrow race here if we are just - // about to start a checkpoint - this won't cancel it! - namesystem.getFSImage().cancelSaveNamespace( - "About to exit standby state"); - } catch (InterruptedException e) { - throw new ServiceFailedException( - "Interrupted while trying to cancel checkpoint"); + public void cancelAndPreventCheckpoints(String msg) throws ServiceFailedException { + thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS); + synchronized (cancelLock) { + // Before beginning a checkpoint, the checkpointer thread + // takes this lock, and creates a canceler object. + // If the canceler is non-null, then a checkpoint is in + // progress and we need to cancel it. If it's null, then + // the operation has not started, meaning that the above + // time-based prevention will take effect. + if (canceler != null) { + canceler.cancel(msg); + } } } @@ -272,10 +280,18 @@ private void doWork() { "exceeds the configured interval " + checkpointConf.getPeriod()); needCheckpoint = true; } - if (needCheckpoint && now < preventCheckpointsUntil) { - LOG.info("But skipping this checkpoint since we are about to failover!"); - canceledCount++; - } else if (needCheckpoint) { + + synchronized (cancelLock) { + if (now < preventCheckpointsUntil) { + LOG.info("But skipping this checkpoint since we are about to failover!"); + canceledCount++; + continue; + } + assert canceler == null; + canceler = new Canceler(); + } + + if (needCheckpoint) { doCheckpoint(); lastCheckpointTime = now; } @@ -287,6 +303,10 @@ private void doWork() { continue; } catch (Throwable t) { LOG.error("Exception in doCheckpoint", t); + } finally { + synchronized (cancelLock) { + canceler = null; + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/Canceler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/Canceler.java new file mode 100644 index 0000000000..f47c79d05f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/Canceler.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Provides a simple interface where one thread can mark an operation + * for cancellation, and another thread can poll for whether the + * cancellation has occurred. + */ +@InterfaceAudience.Private +public class Canceler { + /** + * If the operation has been canceled, set to the reason why + * it has been canceled (eg standby moving to active) + */ + private volatile String cancelReason = null; + + /** + * Requests that the current operation be canceled if it is still running. + * This does not block until the cancellation is successful. + * @param reason the reason why cancellation is requested + */ + public void cancel(String reason) { + this.cancelReason = reason; + } + + public boolean isCancelled() { + return cancelReason != null; + } + + public String getCancellationReason() { + return cancelReason; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java index ed2162b7b9..5808ea4e90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; @@ -517,14 +518,15 @@ public void testCancelSaveNamespace() throws Exception { try { doAnEdit(fsn, 1); - + final Canceler canceler = new Canceler(); + // Save namespace fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER); try { Future saverFuture = pool.submit(new Callable() { @Override public Void call() throws Exception { - image.saveNamespace(finalFsn); + image.saveNamespace(finalFsn, canceler); return null; } }); @@ -534,7 +536,7 @@ public Void call() throws Exception { // then cancel the saveNamespace Future cancelFuture = pool.submit(new Callable() { public Void call() throws Exception { - image.cancelSaveNamespace("cancelled"); + canceler.cancel("cancelled"); return null; } }); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java index 5440c38cc2..64108d0b25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.net.URI; import java.util.List; @@ -36,6 +37,11 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -52,12 +58,18 @@ public class TestStandbyCheckpoints { private NameNode nn0, nn1; private FileSystem fs; + @SuppressWarnings("rawtypes") @Before public void setupCluster() throws Exception { Configuration conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true); + conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY, + SlowCodec.class.getCanonicalName()); + CompressionCodecFactory.setCodecClasses(conf, + ImmutableList.of(SlowCodec.class)); MiniDFSNNTopology topology = new MiniDFSNNTopology() .addNameservice(new MiniDFSNNTopology.NSConf("ns1") @@ -159,14 +171,15 @@ public void testCheckpointWhenNoNewTransactionsHappened() // We should make exactly one checkpoint at this new txid. Mockito.verify(spyImage1, Mockito.times(1)) - .saveNamespace((FSNamesystem) Mockito.anyObject()); + .saveNamespace((FSNamesystem) Mockito.anyObject(), + (Canceler)Mockito.anyObject()); } /** * Test cancellation of ongoing checkpoints when failover happens * mid-checkpoint. */ - @Test + @Test(timeout=120000) public void testCheckpointCancellation() throws Exception { cluster.transitionToStandby(0); @@ -191,16 +204,18 @@ public void testCheckpointCancellation() throws Exception { cluster.transitionToActive(0); - for (int i = 0; i < 10; i++) { + boolean canceledOne = false; + for (int i = 0; i < 10 && !canceledOne; i++) { doEdits(i*10, i*10 + 10); cluster.transitionToStandby(0); cluster.transitionToActive(1); cluster.transitionToStandby(1); cluster.transitionToActive(0); + canceledOne = StandbyCheckpointer.getCanceledCount() > 0; } - assertTrue(StandbyCheckpointer.getCanceledCount() > 0); + assertTrue(canceledOne); } private void doEdits(int start, int stop) throws IOException { @@ -209,5 +224,22 @@ private void doEdits(int start, int stop) throws IOException { fs.mkdirs(p); } } + + /** + * A codec which just slows down the saving of the image significantly + * by sleeping a few milliseconds on every write. This makes it easy to + * catch the standby in the middle of saving a checkpoint. + */ + public static class SlowCodec extends GzipCodec { + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + CompressionOutputStream ret = super.createOutputStream(out); + CompressionOutputStream spy = Mockito.spy(ret); + Mockito.doAnswer(new GenericTestUtils.SleepAnswer(2)) + .when(spy).write(Mockito.any(), Mockito.anyInt(), Mockito.anyInt()); + return spy; + } + } }