From 248d9b6fff648cdb02581d458556b6f7c090ef1a Mon Sep 17 00:00:00 2001 From: Xiao Chen Date: Fri, 20 Oct 2017 16:24:33 -0700 Subject: [PATCH] HDFS-12518. Re-encryption should handle task cancellation and progress better. --- .../namenode/FSDirEncryptionZoneOp.java | 8 +- .../server/namenode/ReencryptionHandler.java | 110 +++++++++++------- .../server/namenode/ReencryptionUpdater.java | 29 +++-- .../server/namenode/TestReencryption.java | 59 +++++++++- 4 files changed, 152 insertions(+), 54 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java index e284b1581d..7dcb8ab76b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java @@ -380,10 +380,16 @@ final class FSDirEncryptionZoneOp { static void saveFileXAttrsForBatch(FSDirectory fsd, List batch) { assert fsd.getFSNamesystem().hasWriteLock(); + assert !fsd.hasWriteLock(); if (batch != null && !batch.isEmpty()) { for (FileEdekInfo entry : batch) { final INode inode = fsd.getInode(entry.getInodeId()); - Preconditions.checkNotNull(inode); + // no dir lock, so inode could be removed. no-op if so. + if (inode == null) { + NameNode.LOG.info("Cannot find inode {}, skip saving xattr for" + + " re-encryption", entry.getInodeId()); + continue; + } fsd.getEditLog().logSetXAttrs(inode.getFullPathName(), inode.getXAttrFeature().getXAttrs(), false); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java index 729b8946e9..01c20382f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java @@ -45,11 +45,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.security.GeneralSecurityException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -112,8 +112,7 @@ public class ReencryptionHandler implements Runnable { private ExecutorCompletionService batchService; private BlockingQueue taskQueue; // protected by ReencryptionHandler object lock - private final Map submissions = - new ConcurrentHashMap<>(); + private final Map submissions = new HashMap<>(); // The current batch that the handler is working on. Handler is designed to // be single-threaded, see class javadoc for more details. @@ -132,8 +131,10 @@ public class ReencryptionHandler implements Runnable { */ void stopThreads() { assert dir.hasWriteLock(); - for (ZoneSubmissionTracker zst : submissions.values()) { - zst.cancelAllTasks(); + synchronized (this) { + for (ZoneSubmissionTracker zst : submissions.values()) { + zst.cancelAllTasks(); + } } if (updaterExecutor != null) { updaterExecutor.shutdownNow(); @@ -269,33 +270,34 @@ public class ReencryptionHandler implements Runnable { throw new IOException("Zone " + zoneName + " is not under re-encryption"); } zs.cancel(); - ZoneSubmissionTracker zst = submissions.get(zoneId); - if (zst != null) { - zst.cancelAllTasks(); - } + removeZoneTrackerStopTasks(zoneId); } void removeZone(final long zoneId) { assert dir.hasWriteLock(); LOG.info("Removing zone {} from re-encryption.", zoneId); - ZoneSubmissionTracker zst = submissions.get(zoneId); - if (zst != null) { - zst.cancelAllTasks(); - } - submissions.remove(zoneId); + removeZoneTrackerStopTasks(zoneId); getReencryptionStatus().removeZone(zoneId); } + synchronized private void removeZoneTrackerStopTasks(final long zoneId) { + final ZoneSubmissionTracker zst = submissions.get(zoneId); + if (zst != null) { + zst.cancelAllTasks(); + submissions.remove(zoneId); + } + } + ZoneSubmissionTracker getTracker(final long zoneId) { - dir.hasReadLock(); + assert dir.hasReadLock(); return unprotectedGetTracker(zoneId); } /** - * get the tracker without holding the FSDirectory lock. This is only used for - * testing, when updater checks about pausing. + * Get the tracker without holding the FSDirectory lock. + * The submissions object is protected by object lock. */ - ZoneSubmissionTracker unprotectedGetTracker(final long zoneId) { + synchronized ZoneSubmissionTracker unprotectedGetTracker(final long zoneId) { return submissions.get(zoneId); } @@ -308,16 +310,19 @@ public class ReencryptionHandler implements Runnable { * * @param zoneId */ - void addDummyTracker(final long zoneId) { + void addDummyTracker(final long zoneId, ZoneSubmissionTracker zst) { assert dir.hasReadLock(); - assert !submissions.containsKey(zoneId); - final ZoneSubmissionTracker zst = new ZoneSubmissionTracker(); + if (zst == null) { + zst = new ZoneSubmissionTracker(); + } zst.setSubmissionDone(); - Future future = batchService.submit( + final Future future = batchService.submit( new EDEKReencryptCallable(zoneId, new ReencryptionBatch(), this)); zst.addTask(future); - submissions.put(zoneId, zst); + synchronized (this) { + submissions.put(zoneId, zst); + } } /** @@ -351,6 +356,8 @@ public class ReencryptionHandler implements Runnable { } LOG.info("Executing re-encrypt commands on zone {}. Current zones:{}", zoneId, getReencryptionStatus()); + getReencryptionStatus().markZoneStarted(zoneId); + resetSubmissionTracker(zoneId); } finally { dir.readUnlock(); } @@ -392,7 +399,6 @@ public class ReencryptionHandler implements Runnable { readLock(); try { - getReencryptionStatus().markZoneStarted(zoneId); zoneNode = dir.getInode(zoneId); // start re-encrypting the zone from the beginning if (zoneNode == null) { @@ -428,6 +434,20 @@ public class ReencryptionHandler implements Runnable { } } + /** + * Reset the zone submission tracker for re-encryption. + * @param zoneId + */ + synchronized private void resetSubmissionTracker(final long zoneId) { + ZoneSubmissionTracker zst = submissions.get(zoneId); + if (zst == null) { + zst = new ZoneSubmissionTracker(); + submissions.put(zoneId, zst); + } else { + zst.reset(); + } + } + List completeReencryption(final INode zoneNode) throws IOException { assert dir.hasWriteLock(); assert dir.getFSNamesystem().hasWriteLock(); @@ -437,8 +457,9 @@ public class ReencryptionHandler implements Runnable { LOG.info("Re-encryption completed on zone {}. Re-encrypted {} files," + " failures encountered: {}.", zoneNode.getFullPathName(), zs.getFilesReencrypted(), zs.getNumReencryptionFailures()); - // This also removes the zone from reencryptionStatus - submissions.remove(zoneId); + synchronized (this) { + submissions.remove(zoneId); + } return FSDirEncryptionZoneOp .updateReencryptionFinish(dir, INodesInPath.fromINode(zoneNode), zs); } @@ -562,10 +583,13 @@ public class ReencryptionHandler implements Runnable { if (currentBatch.isEmpty()) { return; } - ZoneSubmissionTracker zst = submissions.get(zoneId); - if (zst == null) { - zst = new ZoneSubmissionTracker(); - submissions.put(zoneId, zst); + ZoneSubmissionTracker zst; + synchronized (this) { + zst = submissions.get(zoneId); + if (zst == null) { + zst = new ZoneSubmissionTracker(); + submissions.put(zoneId, zst); + } } Future future = batchService .submit(new EDEKReencryptCallable(zoneId, currentBatch, this)); @@ -821,19 +845,13 @@ public class ReencryptionHandler implements Runnable { // 2. if tasks are piling up on the updater, don't create new callables // until the queue size goes down. final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2; - int totalTasks = 0; - for (ZoneSubmissionTracker zst : submissions.values()) { - totalTasks += zst.getTasks().size(); - } - if (totalTasks >= maxTasksPiled) { + int numTasks = numTasksSubmitted(); + if (numTasks >= maxTasksPiled) { LOG.debug("Re-encryption handler throttling because total tasks pending" - + " re-encryption updater is {}", totalTasks); - while (totalTasks >= maxTasksPiled) { + + " re-encryption updater is {}", numTasks); + while (numTasks >= maxTasksPiled) { Thread.sleep(500); - totalTasks = 0; - for (ZoneSubmissionTracker zst : submissions.values()) { - totalTasks += zst.getTasks().size(); - } + numTasks = numTasksSubmitted(); } } @@ -864,6 +882,14 @@ public class ReencryptionHandler implements Runnable { throttleTimerLocked.reset(); } + private synchronized int numTasksSubmitted() { + int ret = 0; + for (ZoneSubmissionTracker zst : submissions.values()) { + ret += zst.getTasks().size(); + } + return ret; + } + /** * Process an Inode for re-encryption. Add to current batch if it's a file, * no-op otherwise. @@ -877,7 +903,7 @@ public class ReencryptionHandler implements Runnable { */ private boolean reencryptINode(final INode inode, final String ezKeyVerName) throws IOException, InterruptedException { - dir.hasReadLock(); + assert dir.hasReadLock(); if (LOG.isTraceEnabled()) { LOG.trace("Processing {} for re-encryption", inode.getFullPathName()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java index d641ea1408..3b7badbfe2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java @@ -93,6 +93,13 @@ public final class ReencryptionUpdater implements Runnable { numFutureDone = 0; } + void reset() { + submissionDone = false; + tasks.clear(); + numCheckpointed = 0; + numFutureDone = 0; + } + LinkedList getTasks() { return tasks; } @@ -238,12 +245,12 @@ public final class ReencryptionUpdater implements Runnable { void markZoneSubmissionDone(final long zoneId) throws IOException, InterruptedException { final ZoneSubmissionTracker tracker = handler.getTracker(zoneId); - if (tracker != null) { + if (tracker != null && !tracker.getTasks().isEmpty()) { tracker.submissionDone = true; } else { // Caller thinks submission is done, but no tasks submitted - meaning // no files in the EZ need to be re-encrypted. Complete directly. - handler.addDummyTracker(zoneId); + handler.addDummyTracker(zoneId, tracker); } } @@ -289,6 +296,7 @@ public final class ReencryptionUpdater implements Runnable { LOG.debug( "Updating file xattrs for re-encrypting zone {}," + " starting at {}", zoneNodePath, task.batch.getFirstFilePath()); + final int batchSize = task.batch.size(); for (Iterator it = task.batch.getBatch().iterator(); it.hasNext();) { FileEdekInfo entry = it.next(); @@ -342,7 +350,7 @@ public final class ReencryptionUpdater implements Runnable { } LOG.info("Updated xattrs on {}({}) files in zone {} for re-encryption," - + " starting:{}.", task.numFilesUpdated, task.batch.size(), + + " starting:{}.", task.numFilesUpdated, batchSize, zoneNodePath, task.batch.getFirstFilePath()); } task.processed = true; @@ -377,6 +385,9 @@ public final class ReencryptionUpdater implements Runnable { ListIterator iter = tasks.listIterator(); while (iter.hasNext()) { Future curr = iter.next(); + if (curr.isCancelled()) { + break; + } if (!curr.isDone() || !curr.get().processed) { // still has earlier tasks not completed, skip here. break; @@ -411,12 +422,12 @@ public final class ReencryptionUpdater implements Runnable { final Future completed = batchService.take(); throttle(); checkPauseForTesting(); - ReencryptionTask task = completed.get(); if (completed.isCancelled()) { - LOG.debug("Skipped canceled re-encryption task for zone {}, last: {}", - task.zoneId, task.lastFile); + // Ignore canceled zones. The cancellation is edit-logged by the handler. + LOG.debug("Skipped a canceled re-encryption task"); return; } + final ReencryptionTask task = completed.get(); boolean shouldRetry; do { @@ -465,7 +476,11 @@ public final class ReencryptionUpdater implements Runnable { task.batch.size(), task.batch.getFirstFilePath()); final ZoneSubmissionTracker tracker = handler.getTracker(zoneNode.getId()); - Preconditions.checkNotNull(tracker, "zone tracker not found " + zonePath); + if (tracker == null) { + // re-encryption canceled. + LOG.info("Re-encryption was canceled."); + return; + } tracker.numFutureDone++; EncryptionFaultInjector.getInstance().reencryptUpdaterProcessOneTask(); processTaskEntries(zonePath, task); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java index 4bf6aa4ff2..aca9a73678 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java @@ -1467,8 +1467,8 @@ public class TestReencryption { assertEquals(5, getZoneStatus(zone.toString()).getFilesReencrypted()); } - @Test - public void testCancelFuture() throws Exception { + private void cancelFutureDuringReencryption(final Path zone) + throws Exception { final AtomicBoolean callableRunning = new AtomicBoolean(false); class MyInjector extends EncryptionFaultInjector { private volatile int exceptionCount = 0; @@ -1498,8 +1498,6 @@ public class TestReencryption { * /dir/f */ final int len = 8196; - final Path zoneParent = new Path("/zones"); - final Path zone = new Path(zoneParent, "zone"); fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true); dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH); for (int i = 0; i < 10; ++i) { @@ -1548,6 +1546,59 @@ public class TestReencryption { assertTrue(getUpdater().isRunning()); } + @Test + public void testCancelFutureThenReencrypt() throws Exception { + final Path zoneParent = new Path("/zones"); + final Path zone = new Path(zoneParent, "zone"); + cancelFutureDuringReencryption(zone); + + // make sure new re-encryption after cancellation works. + getEzManager().resumeReencryptForTesting(); + dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START); + waitForZoneCompletes(zone.toString()); + final RemoteIterator it = + dfsAdmin.listReencryptionStatus(); + final ZoneReencryptionStatus zs = it.next(); + assertEquals(zone.toString(), zs.getZoneName()); + assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState()); + assertFalse(zs.isCanceled()); + assertTrue(zs.getCompletionTime() > 0); + assertTrue(zs.getCompletionTime() > zs.getSubmissionTime()); + assertEquals(10, zs.getFilesReencrypted()); + } + + @Test + public void testCancelFutureThenRestart() throws Exception { + final Path zoneParent = new Path("/zones"); + final Path zone = new Path(zoneParent, "zone"); + cancelFutureDuringReencryption(zone); + + // restart, and check status. + restartClusterDisableReencrypt(); + RemoteIterator it = + dfsAdmin.listReencryptionStatus(); + ZoneReencryptionStatus zs = it.next(); + assertEquals(zone.toString(), zs.getZoneName()); + assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState()); + assertTrue(zs.isCanceled()); + assertTrue(zs.getCompletionTime() > 0); + assertTrue(zs.getCompletionTime() > zs.getSubmissionTime()); + assertEquals(0, zs.getFilesReencrypted()); + + // verify re-encryption works after restart. + getEzManager().resumeReencryptForTesting(); + dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START); + waitForZoneCompletes(zone.toString()); + it = dfsAdmin.listReencryptionStatus(); + zs = it.next(); + assertEquals(zone.toString(), zs.getZoneName()); + assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState()); + assertFalse(zs.isCanceled()); + assertTrue(zs.getCompletionTime() > 0); + assertTrue(zs.getCompletionTime() > zs.getSubmissionTime()); + assertEquals(10, zs.getFilesReencrypted()); + } + @Test public void testReencryptCancelForUpdater() throws Exception { /* Setup test dir: