HDFS-13731. ReencryptionUpdater fails with ConcurrentModificationException during processCheckpoints. Contributed by Zsolt Venczel.

This commit is contained in:
Zsolt Venczel 2018-08-28 15:11:58 -07:00 committed by Xiao Chen
parent c5629d546d
commit 3e18b957eb
2 changed files with 30 additions and 28 deletions

View File

@ -714,10 +714,10 @@ protected void submitCurrentBatch(final Long zoneId) throws IOException,
zst = new ZoneSubmissionTracker(); zst = new ZoneSubmissionTracker();
submissions.put(zoneId, zst); submissions.put(zoneId, zst);
} }
Future future = batchService.submit(new EDEKReencryptCallable(zoneId,
currentBatch, reencryptionHandler));
zst.addTask(future);
} }
Future future = batchService.submit(new EDEKReencryptCallable(zoneId,
currentBatch, reencryptionHandler));
zst.addTask(future);
LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.", LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
currentBatch.getFirstFilePath(), currentBatch.size(), zoneId); currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
currentBatch = new ReencryptionBatch(reencryptBatchSize); currentBatch = new ReencryptionBatch(reencryptBatchSize);

View File

@ -383,32 +383,34 @@ private List<XAttr> processCheckpoints(final INode zoneNode,
final LinkedList<Future> tasks = tracker.getTasks(); final LinkedList<Future> tasks = tracker.getTasks();
final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1); final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
ListIterator<Future> iter = tasks.listIterator(); ListIterator<Future> iter = tasks.listIterator();
while (iter.hasNext()) { synchronized (handler) {
Future<ReencryptionTask> curr = iter.next(); while (iter.hasNext()) {
if (curr.isCancelled()) { Future<ReencryptionTask> curr = iter.next();
break; if (curr.isCancelled()) {
break;
}
if (!curr.isDone() || !curr.get().processed) {
// still has earlier tasks not completed, skip here.
break;
}
ReencryptionTask task = curr.get();
LOG.debug("Updating re-encryption checkpoint with completed task."
+ " last: {} size:{}.", task.lastFile, task.batch.size());
assert zoneId == task.zoneId;
try {
final XAttr xattr = FSDirEncryptionZoneOp
.updateReencryptionProgress(dir, zoneNode, status, task.lastFile,
task.numFilesUpdated, task.numFailures);
xAttrs.clear();
xAttrs.add(xattr);
} catch (IOException ie) {
LOG.warn("Failed to update re-encrypted progress to xattr" +
" for zone {}", zonePath, ie);
++task.numFailures;
}
++tracker.numCheckpointed;
iter.remove();
} }
if (!curr.isDone() || !curr.get().processed) {
// still has earlier tasks not completed, skip here.
break;
}
ReencryptionTask task = curr.get();
LOG.debug("Updating re-encryption checkpoint with completed task."
+ " last: {} size:{}.", task.lastFile, task.batch.size());
assert zoneId == task.zoneId;
try {
final XAttr xattr = FSDirEncryptionZoneOp
.updateReencryptionProgress(dir, zoneNode, status, task.lastFile,
task.numFilesUpdated, task.numFailures);
xAttrs.clear();
xAttrs.add(xattr);
} catch (IOException ie) {
LOG.warn("Failed to update re-encrypted progress to xattr for zone {}",
zonePath, ie);
++task.numFailures;
}
++tracker.numCheckpointed;
iter.remove();
} }
if (tracker.isCompleted()) { if (tracker.isCompleted()) {
LOG.debug("Removed re-encryption tracker for zone {} because it completed" LOG.debug("Removed re-encryption tracker for zone {} because it completed"