HDFS-12518. Re-encryption should handle task cancellation and progress better.

This commit is contained in:
Xiao Chen 2017-10-20 16:24:33 -07:00
parent 14b3c2695b
commit 248d9b6fff
4 changed files with 152 additions and 54 deletions

View File

@ -380,10 +380,16 @@ private static ZoneEncryptionInfoProto getZoneEncryptionInfoProto(
static void saveFileXAttrsForBatch(FSDirectory fsd,
List<FileEdekInfo> batch) {
assert fsd.getFSNamesystem().hasWriteLock();
assert !fsd.hasWriteLock();
if (batch != null && !batch.isEmpty()) {
for (FileEdekInfo entry : batch) {
final INode inode = fsd.getInode(entry.getInodeId());
Preconditions.checkNotNull(inode);
// no dir lock, so inode could be removed. no-op if so.
if (inode == null) {
NameNode.LOG.info("Cannot find inode {}, skip saving xattr for"
+ " re-encryption", entry.getInodeId());
continue;
}
fsd.getEditLog().logSetXAttrs(inode.getFullPathName(),
inode.getXAttrFeature().getXAttrs(), false);
}

View File

@ -45,11 +45,11 @@
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -112,8 +112,7 @@ public class ReencryptionHandler implements Runnable {
private ExecutorCompletionService<ReencryptionTask> batchService;
private BlockingQueue<Runnable> taskQueue;
// protected by ReencryptionHandler object lock
private final Map<Long, ZoneSubmissionTracker> submissions =
new ConcurrentHashMap<>();
private final Map<Long, ZoneSubmissionTracker> submissions = new HashMap<>();
// The current batch that the handler is working on. Handler is designed to
// be single-threaded, see class javadoc for more details.
@ -132,8 +131,10 @@ public class ReencryptionHandler implements Runnable {
*/
void stopThreads() {
assert dir.hasWriteLock();
for (ZoneSubmissionTracker zst : submissions.values()) {
zst.cancelAllTasks();
synchronized (this) {
for (ZoneSubmissionTracker zst : submissions.values()) {
zst.cancelAllTasks();
}
}
if (updaterExecutor != null) {
updaterExecutor.shutdownNow();
@ -269,33 +270,34 @@ void cancelZone(final long zoneId, final String zoneName) throws IOException {
throw new IOException("Zone " + zoneName + " is not under re-encryption");
}
zs.cancel();
ZoneSubmissionTracker zst = submissions.get(zoneId);
if (zst != null) {
zst.cancelAllTasks();
}
removeZoneTrackerStopTasks(zoneId);
}
void removeZone(final long zoneId) {
assert dir.hasWriteLock();
LOG.info("Removing zone {} from re-encryption.", zoneId);
ZoneSubmissionTracker zst = submissions.get(zoneId);
if (zst != null) {
zst.cancelAllTasks();
}
submissions.remove(zoneId);
removeZoneTrackerStopTasks(zoneId);
getReencryptionStatus().removeZone(zoneId);
}
synchronized private void removeZoneTrackerStopTasks(final long zoneId) {
final ZoneSubmissionTracker zst = submissions.get(zoneId);
if (zst != null) {
zst.cancelAllTasks();
submissions.remove(zoneId);
}
}
ZoneSubmissionTracker getTracker(final long zoneId) {
dir.hasReadLock();
assert dir.hasReadLock();
return unprotectedGetTracker(zoneId);
}
/**
* get the tracker without holding the FSDirectory lock. This is only used for
* testing, when updater checks about pausing.
* Get the tracker without holding the FSDirectory lock.
* The submissions object is protected by object lock.
*/
ZoneSubmissionTracker unprotectedGetTracker(final long zoneId) {
synchronized ZoneSubmissionTracker unprotectedGetTracker(final long zoneId) {
return submissions.get(zoneId);
}
@ -308,16 +310,19 @@ ZoneSubmissionTracker unprotectedGetTracker(final long zoneId) {
*
* @param zoneId
*/
void addDummyTracker(final long zoneId) {
void addDummyTracker(final long zoneId, ZoneSubmissionTracker zst) {
assert dir.hasReadLock();
assert !submissions.containsKey(zoneId);
final ZoneSubmissionTracker zst = new ZoneSubmissionTracker();
if (zst == null) {
zst = new ZoneSubmissionTracker();
}
zst.setSubmissionDone();
Future future = batchService.submit(
final Future future = batchService.submit(
new EDEKReencryptCallable(zoneId, new ReencryptionBatch(), this));
zst.addTask(future);
submissions.put(zoneId, zst);
synchronized (this) {
submissions.put(zoneId, zst);
}
}
/**
@ -351,6 +356,8 @@ public void run() {
}
LOG.info("Executing re-encrypt commands on zone {}. Current zones:{}",
zoneId, getReencryptionStatus());
getReencryptionStatus().markZoneStarted(zoneId);
resetSubmissionTracker(zoneId);
} finally {
dir.readUnlock();
}
@ -392,7 +399,6 @@ void reencryptEncryptionZone(final long zoneId)
readLock();
try {
getReencryptionStatus().markZoneStarted(zoneId);
zoneNode = dir.getInode(zoneId);
// start re-encrypting the zone from the beginning
if (zoneNode == null) {
@ -428,6 +434,20 @@ void reencryptEncryptionZone(final long zoneId)
}
}
/**
* Reset the zone submission tracker for re-encryption.
* @param zoneId
*/
synchronized private void resetSubmissionTracker(final long zoneId) {
ZoneSubmissionTracker zst = submissions.get(zoneId);
if (zst == null) {
zst = new ZoneSubmissionTracker();
submissions.put(zoneId, zst);
} else {
zst.reset();
}
}
List<XAttr> completeReencryption(final INode zoneNode) throws IOException {
assert dir.hasWriteLock();
assert dir.getFSNamesystem().hasWriteLock();
@ -437,8 +457,9 @@ List<XAttr> completeReencryption(final INode zoneNode) throws IOException {
LOG.info("Re-encryption completed on zone {}. Re-encrypted {} files,"
+ " failures encountered: {}.", zoneNode.getFullPathName(),
zs.getFilesReencrypted(), zs.getNumReencryptionFailures());
// This also removes the zone from reencryptionStatus
submissions.remove(zoneId);
synchronized (this) {
submissions.remove(zoneId);
}
return FSDirEncryptionZoneOp
.updateReencryptionFinish(dir, INodesInPath.fromINode(zoneNode), zs);
}
@ -562,10 +583,13 @@ private void submitCurrentBatch(final long zoneId)
if (currentBatch.isEmpty()) {
return;
}
ZoneSubmissionTracker zst = submissions.get(zoneId);
if (zst == null) {
zst = new ZoneSubmissionTracker();
submissions.put(zoneId, zst);
ZoneSubmissionTracker zst;
synchronized (this) {
zst = submissions.get(zoneId);
if (zst == null) {
zst = new ZoneSubmissionTracker();
submissions.put(zoneId, zst);
}
}
Future future = batchService
.submit(new EDEKReencryptCallable(zoneId, currentBatch, this));
@ -821,19 +845,13 @@ void throttle() throws InterruptedException {
// 2. if tasks are piling up on the updater, don't create new callables
// until the queue size goes down.
final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
int totalTasks = 0;
for (ZoneSubmissionTracker zst : submissions.values()) {
totalTasks += zst.getTasks().size();
}
if (totalTasks >= maxTasksPiled) {
int numTasks = numTasksSubmitted();
if (numTasks >= maxTasksPiled) {
LOG.debug("Re-encryption handler throttling because total tasks pending"
+ " re-encryption updater is {}", totalTasks);
while (totalTasks >= maxTasksPiled) {
+ " re-encryption updater is {}", numTasks);
while (numTasks >= maxTasksPiled) {
Thread.sleep(500);
totalTasks = 0;
for (ZoneSubmissionTracker zst : submissions.values()) {
totalTasks += zst.getTasks().size();
}
numTasks = numTasksSubmitted();
}
}
@ -864,6 +882,14 @@ void throttle() throws InterruptedException {
throttleTimerLocked.reset();
}
private synchronized int numTasksSubmitted() {
int ret = 0;
for (ZoneSubmissionTracker zst : submissions.values()) {
ret += zst.getTasks().size();
}
return ret;
}
/**
* Process an Inode for re-encryption. Add to current batch if it's a file,
* no-op otherwise.
@ -877,7 +903,7 @@ void throttle() throws InterruptedException {
*/
private boolean reencryptINode(final INode inode, final String ezKeyVerName)
throws IOException, InterruptedException {
dir.hasReadLock();
assert dir.hasReadLock();
if (LOG.isTraceEnabled()) {
LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
}

View File

@ -93,6 +93,13 @@ static final class ZoneSubmissionTracker {
numFutureDone = 0;
}
void reset() {
submissionDone = false;
tasks.clear();
numCheckpointed = 0;
numFutureDone = 0;
}
LinkedList<Future> getTasks() {
return tasks;
}
@ -238,12 +245,12 @@ boolean isRunning() {
void markZoneSubmissionDone(final long zoneId)
throws IOException, InterruptedException {
final ZoneSubmissionTracker tracker = handler.getTracker(zoneId);
if (tracker != null) {
if (tracker != null && !tracker.getTasks().isEmpty()) {
tracker.submissionDone = true;
} else {
// Caller thinks submission is done, but no tasks submitted - meaning
// no files in the EZ need to be re-encrypted. Complete directly.
handler.addDummyTracker(zoneId);
handler.addDummyTracker(zoneId, tracker);
}
}
@ -289,6 +296,7 @@ private void processTaskEntries(final String zoneNodePath,
LOG.debug(
"Updating file xattrs for re-encrypting zone {}," + " starting at {}",
zoneNodePath, task.batch.getFirstFilePath());
final int batchSize = task.batch.size();
for (Iterator<FileEdekInfo> it = task.batch.getBatch().iterator();
it.hasNext();) {
FileEdekInfo entry = it.next();
@ -342,7 +350,7 @@ private void processTaskEntries(final String zoneNodePath,
}
LOG.info("Updated xattrs on {}({}) files in zone {} for re-encryption,"
+ " starting:{}.", task.numFilesUpdated, task.batch.size(),
+ " starting:{}.", task.numFilesUpdated, batchSize,
zoneNodePath, task.batch.getFirstFilePath());
}
task.processed = true;
@ -377,6 +385,9 @@ private List<XAttr> processCheckpoints(final INode zoneNode,
ListIterator<Future> iter = tasks.listIterator();
while (iter.hasNext()) {
Future<ReencryptionTask> curr = iter.next();
if (curr.isCancelled()) {
break;
}
if (!curr.isDone() || !curr.get().processed) {
// still has earlier tasks not completed, skip here.
break;
@ -411,12 +422,12 @@ private void takeAndProcessTasks() throws Exception {
final Future<ReencryptionTask> completed = batchService.take();
throttle();
checkPauseForTesting();
ReencryptionTask task = completed.get();
if (completed.isCancelled()) {
LOG.debug("Skipped canceled re-encryption task for zone {}, last: {}",
task.zoneId, task.lastFile);
// Ignore canceled zones. The cancellation is edit-logged by the handler.
LOG.debug("Skipped a canceled re-encryption task");
return;
}
final ReencryptionTask task = completed.get();
boolean shouldRetry;
do {
@ -465,7 +476,11 @@ private void processTask(ReencryptionTask task)
task.batch.size(), task.batch.getFirstFilePath());
final ZoneSubmissionTracker tracker =
handler.getTracker(zoneNode.getId());
Preconditions.checkNotNull(tracker, "zone tracker not found " + zonePath);
if (tracker == null) {
// re-encryption canceled.
LOG.info("Re-encryption was canceled.");
return;
}
tracker.numFutureDone++;
EncryptionFaultInjector.getInstance().reencryptUpdaterProcessOneTask();
processTaskEntries(zonePath, task);

View File

@ -1467,8 +1467,8 @@ public void testReencryptCancel() throws Exception {
assertEquals(5, getZoneStatus(zone.toString()).getFilesReencrypted());
}
@Test
public void testCancelFuture() throws Exception {
private void cancelFutureDuringReencryption(final Path zone)
throws Exception {
final AtomicBoolean callableRunning = new AtomicBoolean(false);
class MyInjector extends EncryptionFaultInjector {
private volatile int exceptionCount = 0;
@ -1498,8 +1498,6 @@ public void reencryptEncryptedKeys() throws IOException {
* /dir/f
*/
final int len = 8196;
final Path zoneParent = new Path("/zones");
final Path zone = new Path(zoneParent, "zone");
fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
for (int i = 0; i < 10; ++i) {
@ -1548,6 +1546,59 @@ public Boolean get() {
assertTrue(getUpdater().isRunning());
}
@Test
public void testCancelFutureThenReencrypt() throws Exception {
final Path zoneParent = new Path("/zones");
final Path zone = new Path(zoneParent, "zone");
cancelFutureDuringReencryption(zone);
// make sure new re-encryption after cancellation works.
getEzManager().resumeReencryptForTesting();
dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
waitForZoneCompletes(zone.toString());
final RemoteIterator<ZoneReencryptionStatus> it =
dfsAdmin.listReencryptionStatus();
final ZoneReencryptionStatus zs = it.next();
assertEquals(zone.toString(), zs.getZoneName());
assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
assertFalse(zs.isCanceled());
assertTrue(zs.getCompletionTime() > 0);
assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
assertEquals(10, zs.getFilesReencrypted());
}
@Test
public void testCancelFutureThenRestart() throws Exception {
final Path zoneParent = new Path("/zones");
final Path zone = new Path(zoneParent, "zone");
cancelFutureDuringReencryption(zone);
// restart, and check status.
restartClusterDisableReencrypt();
RemoteIterator<ZoneReencryptionStatus> it =
dfsAdmin.listReencryptionStatus();
ZoneReencryptionStatus zs = it.next();
assertEquals(zone.toString(), zs.getZoneName());
assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
assertTrue(zs.isCanceled());
assertTrue(zs.getCompletionTime() > 0);
assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
assertEquals(0, zs.getFilesReencrypted());
// verify re-encryption works after restart.
getEzManager().resumeReencryptForTesting();
dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
waitForZoneCompletes(zone.toString());
it = dfsAdmin.listReencryptionStatus();
zs = it.next();
assertEquals(zone.toString(), zs.getZoneName());
assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
assertFalse(zs.isCanceled());
assertTrue(zs.getCompletionTime() > 0);
assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
assertEquals(10, zs.getFilesReencrypted());
}
@Test
public void testReencryptCancelForUpdater() throws Exception {
/* Setup test dir: