diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index aac0af9993..38d69a3c00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -21,11 +21,8 @@ import static java.util.concurrent.TimeUnit.SECONDS; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,86 +35,141 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; -import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; +import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.NMProtoUtils; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; -import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class DeletionService extends AbstractService { - static final Log LOG = LogFactory.getLog(DeletionService.class); - private int debugDelay; - private final ContainerExecutor exec; - private ScheduledThreadPoolExecutor sched; - private static final FileContext lfs = getLfs(); - private final NMStateStoreService stateStore; - private AtomicInteger nextTaskId = new AtomicInteger(0); - static final FileContext getLfs() { - try { - return FileContext.getLocalFSFileContext(); - } catch (UnsupportedFileSystemException e) { - throw new RuntimeException(e); - } - } + private static final Log LOG = LogFactory.getLog(DeletionService.class); + + private int debugDelay; + private final ContainerExecutor containerExecutor; + private final NMStateStoreService stateStore; + private ScheduledThreadPoolExecutor sched; + private AtomicInteger nextTaskId = new AtomicInteger(0); public DeletionService(ContainerExecutor exec) { this(exec, new NMNullStateStoreService()); } - public DeletionService(ContainerExecutor exec, + public DeletionService(ContainerExecutor containerExecutor, NMStateStoreService stateStore) { super(DeletionService.class.getName()); - this.exec = exec; + this.containerExecutor = containerExecutor; this.debugDelay = 0; this.stateStore = stateStore; } - - /** - * Delete the path(s) as this user. - * @param user The user to delete as, or the JVM user if null - * @param subDir the sub directory name - * @param baseDirs the base directories which contains the subDir's - */ - public void delete(String user, Path subDir, Path... baseDirs) { - // TODO if parent owned by NM, rename within parent inline + + public int getDebugDelay() { + return debugDelay; + } + + public ContainerExecutor getContainerExecutor() { + return containerExecutor; + } + + public NMStateStoreService getStateStore() { + return stateStore; + } + + public void delete(DeletionTask deletionTask) { if (debugDelay != -1) { - List baseDirList = null; - if (baseDirs != null && baseDirs.length != 0) { - baseDirList = Arrays.asList(baseDirs); + if (LOG.isDebugEnabled()) { + String msg = String.format("Scheduling DeletionTask (delay %d) : %s", + debugDelay, deletionTask.toString()); + LOG.debug(msg); } - FileDeletionTask task = - new FileDeletionTask(this, user, subDir, baseDirList); - recordDeletionTaskInStateStore(task); - sched.schedule(task, debugDelay, TimeUnit.SECONDS); + recordDeletionTaskInStateStore(deletionTask); + sched.schedule(deletionTask, debugDelay, TimeUnit.SECONDS); } } - - public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) { - if (debugDelay != -1) { - recordDeletionTaskInStateStore(fileDeletionTask); - sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS); + + private void recover(NMStateStoreService.RecoveredDeletionServiceState state) + throws IOException { + List taskProtos = state.getTasks(); + Map idToInfoMap = + new HashMap<>(taskProtos.size()); + Set successorTasks = new HashSet<>(); + for (DeletionServiceDeleteTaskProto proto : taskProtos) { + DeletionTaskRecoveryInfo info = + NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this); + idToInfoMap.put(info.getTask().getTaskId(), info); + nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId())); + successorTasks.addAll(info.getSuccessorTaskIds()); + } + + // restore the task dependencies and schedule the deletion tasks that + // have no predecessors + final long now = System.currentTimeMillis(); + for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) { + for (Integer successorId : info.getSuccessorTaskIds()){ + DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId); + if (successor != null) { + info.getTask().addDeletionTaskDependency(successor.getTask()); + } else { + LOG.error("Unable to locate dependency task for deletion task " + + info.getTask().getTaskId()); + } + } + if (!successorTasks.contains(info.getTask().getTaskId())) { + long msecTilDeletion = info.getDeletionTimestamp() - now; + sched.schedule(info.getTask(), msecTilDeletion, TimeUnit.MILLISECONDS); + } } } - + + private int generateTaskId() { + // get the next ID but avoid an invalid ID + int taskId = nextTaskId.incrementAndGet(); + while (taskId == DeletionTask.INVALID_TASK_ID) { + taskId = nextTaskId.incrementAndGet(); + } + return taskId; + } + + private void recordDeletionTaskInStateStore(DeletionTask task) { + if (!stateStore.canRecover()) { + // optimize the case where we aren't really recording + return; + } + if (task.getTaskId() != DeletionTask.INVALID_TASK_ID) { + return; // task already recorded + } + + task.setTaskId(generateTaskId()); + + // store successors first to ensure task IDs have been generated for them + DeletionTask[] successors = task.getSuccessorTasks(); + for (DeletionTask successor : successors) { + recordDeletionTaskInStateStore(successor); + } + + try { + stateStore.storeDeletionTask(task.getTaskId(), + task.convertDeletionTaskToProto()); + } catch (IOException e) { + LOG.error("Unable to store deletion task " + task.getTaskId(), e); + } + } + @Override protected void serviceInit(Configuration conf) throws Exception { ThreadFactory tf = new ThreadFactoryBuilder() - .setNameFormat("DeletionService #%d") - .build(); + .setNameFormat("DeletionService #%d") + .build(); if (conf != null) { sched = new HadoopScheduledThreadPoolExecutor( conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, - YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf); + YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf); debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0); } else { sched = new HadoopScheduledThreadPoolExecutor( @@ -132,15 +184,14 @@ protected void serviceInit(Configuration conf) throws Exception { } @Override - protected void serviceStop() throws Exception { + public void serviceStop() throws Exception { if (sched != null) { sched.shutdown(); boolean terminated = false; try { terminated = sched.awaitTermination(10, SECONDS); - } catch (InterruptedException e) { - } - if (terminated != true) { + } catch (InterruptedException e) { } + if (!terminated) { sched.shutdownNow(); } } @@ -156,343 +207,4 @@ protected void serviceStop() throws Exception { public boolean isTerminated() { return getServiceState() == STATE.STOPPED && sched.isTerminated(); } - - public static class FileDeletionTask implements Runnable { - public static final int INVALID_TASK_ID = -1; - private int taskId; - private final String user; - private final Path subDir; - private final List baseDirs; - private final AtomicInteger numberOfPendingPredecessorTasks; - private final Set successorTaskSet; - private final DeletionService delService; - // By default all tasks will start as success=true; however if any of - // the dependent task fails then it will be marked as false in - // fileDeletionTaskFinished(). - private boolean success; - - private FileDeletionTask(DeletionService delService, String user, - Path subDir, List baseDirs) { - this(INVALID_TASK_ID, delService, user, subDir, baseDirs); - } - - private FileDeletionTask(int taskId, DeletionService delService, - String user, Path subDir, List baseDirs) { - this.taskId = taskId; - this.delService = delService; - this.user = user; - this.subDir = subDir; - this.baseDirs = baseDirs; - this.successorTaskSet = new HashSet(); - this.numberOfPendingPredecessorTasks = new AtomicInteger(0); - success = true; - } - - /** - * increments and returns pending predecessor task count - */ - public int incrementAndGetPendingPredecessorTasks() { - return numberOfPendingPredecessorTasks.incrementAndGet(); - } - - /** - * decrements and returns pending predecessor task count - */ - public int decrementAndGetPendingPredecessorTasks() { - return numberOfPendingPredecessorTasks.decrementAndGet(); - } - - @VisibleForTesting - public String getUser() { - return this.user; - } - - @VisibleForTesting - public Path getSubDir() { - return this.subDir; - } - - @VisibleForTesting - public List getBaseDirs() { - return this.baseDirs; - } - - public synchronized void setSuccess(boolean success) { - this.success = success; - } - - public synchronized boolean getSucess() { - return this.success; - } - - public synchronized FileDeletionTask[] getSuccessorTasks() { - FileDeletionTask[] successors = - new FileDeletionTask[successorTaskSet.size()]; - return successorTaskSet.toArray(successors); - } - - @Override - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug(this); - } - boolean error = false; - if (null == user) { - if (baseDirs == null || baseDirs.size() == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("NM deleting absolute path : " + subDir); - } - try { - lfs.delete(subDir, true); - } catch (IOException e) { - error = true; - LOG.warn("Failed to delete " + subDir); - } - } else { - for (Path baseDir : baseDirs) { - Path del = subDir == null? baseDir : new Path(baseDir, subDir); - if (LOG.isDebugEnabled()) { - LOG.debug("NM deleting path : " + del); - } - try { - lfs.delete(del, true); - } catch (IOException e) { - error = true; - LOG.warn("Failed to delete " + subDir); - } - } - } - } else { - try { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Deleting path: [" + subDir + "] as user: [" + user + "]"); - } - if (baseDirs == null || baseDirs.size() == 0) { - delService.exec.deleteAsUser(new DeletionAsUserContext.Builder() - .setUser(user) - .setSubDir(subDir) - .build()); - } else { - delService.exec.deleteAsUser(new DeletionAsUserContext.Builder() - .setUser(user) - .setSubDir(subDir) - .setBasedirs(baseDirs.toArray(new Path[0])) - .build()); - } - } catch (IOException e) { - error = true; - LOG.warn("Failed to delete as user " + user, e); - } catch (InterruptedException e) { - error = true; - LOG.warn("Failed to delete as user " + user, e); - } - } - if (error) { - setSuccess(!error); - } - fileDeletionTaskFinished(); - } - - @Override - public String toString() { - StringBuffer sb = new StringBuffer("\nFileDeletionTask : "); - sb.append(" user : ").append(this.user); - sb.append(" subDir : ").append( - subDir == null ? "null" : subDir.toString()); - sb.append(" baseDir : "); - if (baseDirs == null || baseDirs.size() == 0) { - sb.append("null"); - } else { - for (Path baseDir : baseDirs) { - sb.append(baseDir.toString()).append(','); - } - } - return sb.toString(); - } - - /** - * If there is a task dependency between say tasks 1,2,3 such that - * task2 and task3 can be started only after task1 then we should define - * task2 and task3 as successor tasks for task1. - * Note:- Task dependency should be defined prior to - * @param successorTask - */ - public synchronized void addFileDeletionTaskDependency( - FileDeletionTask successorTask) { - if (successorTaskSet.add(successorTask)) { - successorTask.incrementAndGetPendingPredecessorTasks(); - } - } - - /* - * This is called when - * 1) Current file deletion task ran and finished. - * 2) This can be even directly called by predecessor task if one of the - * dependent tasks of it has failed marking its success = false. - */ - private synchronized void fileDeletionTaskFinished() { - try { - delService.stateStore.removeDeletionTask(taskId); - } catch (IOException e) { - LOG.error("Unable to remove deletion task " + taskId - + " from state store", e); - } - Iterator successorTaskI = - this.successorTaskSet.iterator(); - while (successorTaskI.hasNext()) { - FileDeletionTask successorTask = successorTaskI.next(); - if (!success) { - successorTask.setSuccess(success); - } - int count = successorTask.decrementAndGetPendingPredecessorTasks(); - if (count == 0) { - if (successorTask.getSucess()) { - successorTask.delService.scheduleFileDeletionTask(successorTask); - } else { - successorTask.fileDeletionTaskFinished(); - } - } - } - } - } - - /** - * Helper method to create file deletion task. To be used only if we need - * a way to define dependencies between deletion tasks. - * @param user user on whose behalf this task is suppose to run - * @param subDir sub directory as required in - * {@link DeletionService#delete(String, Path, Path...)} - * @param baseDirs base directories as required in - * {@link DeletionService#delete(String, Path, Path...)} - */ - public FileDeletionTask createFileDeletionTask(String user, Path subDir, - Path[] baseDirs) { - return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)); - } - - private void recover(RecoveredDeletionServiceState state) - throws IOException { - List taskProtos = state.getTasks(); - Map idToInfoMap = - new HashMap(taskProtos.size()); - Set successorTasks = new HashSet(); - for (DeletionServiceDeleteTaskProto proto : taskProtos) { - DeletionTaskRecoveryInfo info = parseTaskProto(proto); - idToInfoMap.put(info.task.taskId, info); - nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId)); - successorTasks.addAll(info.successorTaskIds); - } - - // restore the task dependencies and schedule the deletion tasks that - // have no predecessors - final long now = System.currentTimeMillis(); - for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) { - for (Integer successorId : info.successorTaskIds){ - DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId); - if (successor != null) { - info.task.addFileDeletionTaskDependency(successor.task); - } else { - LOG.error("Unable to locate dependency task for deletion task " - + info.task.taskId + " at " + info.task.getSubDir()); - } - } - if (!successorTasks.contains(info.task.taskId)) { - long msecTilDeletion = info.deletionTimestamp - now; - sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS); - } - } - } - - private DeletionTaskRecoveryInfo parseTaskProto( - DeletionServiceDeleteTaskProto proto) throws IOException { - int taskId = proto.getId(); - String user = proto.hasUser() ? proto.getUser() : null; - Path subdir = null; - List basePaths = null; - if (proto.hasSubdir()) { - subdir = new Path(proto.getSubdir()); - } - List basedirs = proto.getBasedirsList(); - if (basedirs != null && basedirs.size() > 0) { - basePaths = new ArrayList(basedirs.size()); - for (String basedir : basedirs) { - basePaths.add(new Path(basedir)); - } - } - - FileDeletionTask task = new FileDeletionTask(taskId, this, user, - subdir, basePaths); - return new DeletionTaskRecoveryInfo(task, - proto.getSuccessorIdsList(), - proto.getDeletionTime()); - } - - private int generateTaskId() { - // get the next ID but avoid an invalid ID - int taskId = nextTaskId.incrementAndGet(); - while (taskId == FileDeletionTask.INVALID_TASK_ID) { - taskId = nextTaskId.incrementAndGet(); - } - return taskId; - } - - private void recordDeletionTaskInStateStore(FileDeletionTask task) { - if (!stateStore.canRecover()) { - // optimize the case where we aren't really recording - return; - } - if (task.taskId != FileDeletionTask.INVALID_TASK_ID) { - return; // task already recorded - } - - task.taskId = generateTaskId(); - - FileDeletionTask[] successors = task.getSuccessorTasks(); - - // store successors first to ensure task IDs have been generated for them - for (FileDeletionTask successor : successors) { - recordDeletionTaskInStateStore(successor); - } - - DeletionServiceDeleteTaskProto.Builder builder = - DeletionServiceDeleteTaskProto.newBuilder(); - builder.setId(task.taskId); - if (task.getUser() != null) { - builder.setUser(task.getUser()); - } - if (task.getSubDir() != null) { - builder.setSubdir(task.getSubDir().toString()); - } - builder.setDeletionTime(System.currentTimeMillis() + - TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS)); - if (task.getBaseDirs() != null) { - for (Path dir : task.getBaseDirs()) { - builder.addBasedirs(dir.toString()); - } - } - for (FileDeletionTask successor : successors) { - builder.addSuccessorIds(successor.taskId); - } - - try { - stateStore.storeDeletionTask(task.taskId, builder.build()); - } catch (IOException e) { - LOG.error("Unable to store deletion task " + task.taskId + " for " - + task.getSubDir(), e); - } - } - - private static class DeletionTaskRecoveryInfo { - FileDeletionTask task; - List successorTaskIds; - long deletionTimestamp; - - public DeletionTaskRecoveryInfo(FileDeletionTask task, - List successorTaskIds, long deletionTimestamp) { - this.task = task; - this.successorTaskIds = successorTaskIds; - this.deletionTimestamp = deletionTimestamp; - } - } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java new file mode 100644 index 0000000000..e47b3eef00 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTaskType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; + +import java.util.ArrayList; +import java.util.List; + +/** + * Utilities for converting from PB representations. + */ +public final class NMProtoUtils { + + private static final Log LOG = LogFactory.getLog(NMProtoUtils.class); + + private NMProtoUtils() { } + + /** + * Convert the Protobuf representation into a {@link DeletionTask}. + * + * @param proto the Protobuf representation for the DeletionTask + * @param deletionService the {@link DeletionService} + * @return the converted {@link DeletionTask} + */ + public static DeletionTask convertProtoToDeletionTask( + DeletionServiceDeleteTaskProto proto, DeletionService deletionService) { + int taskId = proto.getId(); + if (proto.hasTaskType() && proto.getTaskType() != null) { + if (proto.getTaskType().equals(DeletionTaskType.FILE.name())) { + LOG.debug("Converting recovered FileDeletionTask"); + return convertProtoToFileDeletionTask(proto, deletionService, taskId); + } + } + LOG.debug("Unable to get task type, trying FileDeletionTask"); + return convertProtoToFileDeletionTask(proto, deletionService, taskId); + } + + /** + * Convert the Protobuf representation into the {@link FileDeletionTask}. + * + * @param proto the Protobuf representation of the {@link FileDeletionTask} + * @param deletionService the {@link DeletionService}. + * @param taskId the ID of the {@link DeletionTask}. + * @return the populated {@link FileDeletionTask}. + */ + public static FileDeletionTask convertProtoToFileDeletionTask( + DeletionServiceDeleteTaskProto proto, DeletionService deletionService, + int taskId) { + String user = proto.hasUser() ? proto.getUser() : null; + Path subdir = null; + if (proto.hasSubdir()) { + subdir = new Path(proto.getSubdir()); + } + List basePaths = null; + List basedirs = proto.getBasedirsList(); + if (basedirs != null && basedirs.size() > 0) { + basePaths = new ArrayList<>(basedirs.size()); + for (String basedir : basedirs) { + basePaths.add(new Path(basedir)); + } + } + return new FileDeletionTask(taskId, deletionService, user, subdir, + basePaths); + } + + /** + * Convert the Protobuf representation to the {@link DeletionTaskRecoveryInfo} + * representation. + * + * @param proto the Protobuf representation of the {@link DeletionTask} + * @param deletionService the {@link DeletionService} + * @return the populated {@link DeletionTaskRecoveryInfo} + */ + public static DeletionTaskRecoveryInfo convertProtoToDeletionTaskRecoveryInfo( + DeletionServiceDeleteTaskProto proto, DeletionService deletionService) { + DeletionTask deletionTask = + NMProtoUtils.convertProtoToDeletionTask(proto, deletionService); + List successorTaskIds = new ArrayList<>(); + if (proto.getSuccessorIdsList() != null && + !proto.getSuccessorIdsList().isEmpty()) { + successorTaskIds = proto.getSuccessorIdsList(); + } + long deletionTimestamp = proto.getDeletionTime(); + return new DeletionTaskRecoveryInfo(deletionTask, successorTaskIds, + deletionTimestamp); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java new file mode 100644 index 0000000000..006f49f5df --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package containing classes for working with Protobuf. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java new file mode 100644 index 0000000000..c62ea0298e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; + +import java.util.List; + +/** + * Encapsulates the recovery info needed to recover a DeletionTask from the NM + * state store. + */ +public class DeletionTaskRecoveryInfo { + + private DeletionTask task; + private List successorTaskIds; + private long deletionTimestamp; + + /** + * Information needed for recovering the DeletionTask. + * + * @param task the DeletionTask + * @param successorTaskIds the dependent DeletionTasks. + * @param deletionTimestamp the scheduled times of deletion. + */ + public DeletionTaskRecoveryInfo(DeletionTask task, + List successorTaskIds, long deletionTimestamp) { + this.task = task; + this.successorTaskIds = successorTaskIds; + this.deletionTimestamp = deletionTimestamp; + } + + /** + * Return the recovered DeletionTask. + * + * @return the recovered DeletionTask. + */ + public DeletionTask getTask() { + return task; + } + + /** + * Return all of the dependent DeletionTasks. + * + * @return the dependent DeletionTasks. + */ + public List getSuccessorTaskIds() { + return successorTaskIds; + } + + /** + * Return the deletion timestamp. + * + * @return the deletion timestamp. + */ + public long getDeletionTimestamp() { + return deletionTimestamp; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java new file mode 100644 index 0000000000..28d7f62db9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package containing classes for recovering DeletionTasks. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java new file mode 100644 index 0000000000..635d7a9ac5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * DeletionTasks are supplied to the {@link DeletionService} for deletion. + */ +public abstract class DeletionTask implements Runnable { + + static final Log LOG = LogFactory.getLog(DeletionTask.class); + + public static final int INVALID_TASK_ID = -1; + + private int taskId; + private String user; + private DeletionTaskType deletionTaskType; + private DeletionService deletionService; + private final AtomicInteger numberOfPendingPredecessorTasks; + private final Set successorTaskSet; + // By default all tasks will start as success=true; however if any of + // the dependent task fails then it will be marked as false in + // deletionTaskFinished(). + private boolean success; + + /** + * Deletion task with taskId and default values. + * + * @param taskId the ID of the task, if previously set. + * @param deletionService the {@link DeletionService}. + * @param user the user associated with the delete. + * @param deletionTaskType the {@link DeletionTaskType}. + */ + public DeletionTask(int taskId, DeletionService deletionService, String user, + DeletionTaskType deletionTaskType) { + this(taskId, deletionService, user, new AtomicInteger(0), + new HashSet(), deletionTaskType); + } + + /** + * Deletion task with taskId and user supplied values. + * + * @param taskId the ID of the task, if previously set. + * @param deletionService the {@link DeletionService}. + * @param user the user associated with the delete. + * @param numberOfPendingPredecessorTasks Number of pending tasks. + * @param successorTaskSet the list of successor DeletionTasks + * @param deletionTaskType the {@link DeletionTaskType}. + */ + public DeletionTask(int taskId, DeletionService deletionService, String user, + AtomicInteger numberOfPendingPredecessorTasks, + Set successorTaskSet, DeletionTaskType deletionTaskType) { + this.taskId = taskId; + this.deletionService = deletionService; + this.user = user; + this.numberOfPendingPredecessorTasks = numberOfPendingPredecessorTasks; + this.successorTaskSet = successorTaskSet; + this.deletionTaskType = deletionTaskType; + success = true; + } + + /** + * Get the taskId for the DeletionTask. + * + * @return the taskId. + */ + public int getTaskId() { + return taskId; + } + + /** + * Set the taskId for the DeletionTask. + * + * @param taskId the taskId. + */ + public void setTaskId(int taskId) { + this.taskId = taskId; + } + + /** + * The the user assoicated with the DeletionTask. + * + * @return the user name. + */ + public String getUser() { + return user; + } + + /** + * Get the {@link DeletionService} for this DeletionTask. + * + * @return the {@link DeletionService}. + */ + public DeletionService getDeletionService() { + return deletionService; + } + + /** + * Get the {@link DeletionTaskType} for this DeletionTask. + * + * @return the {@link DeletionTaskType}. + */ + public DeletionTaskType getDeletionTaskType() { + return deletionTaskType; + } + + /** + * Set the DeletionTask run status. + * + * @param success the status of the running DeletionTask. + */ + public synchronized void setSuccess(boolean success) { + this.success = success; + } + + /** + * Return the DeletionTask run status. + * + * @return the status of the running DeletionTask. + */ + public synchronized boolean getSucess() { + return this.success; + } + + /** + * Return the list of successor tasks for the DeletionTask. + * + * @return the list of successor tasks. + */ + public synchronized DeletionTask[] getSuccessorTasks() { + DeletionTask[] successors = new DeletionTask[successorTaskSet.size()]; + return successorTaskSet.toArray(successors); + } + + /** + * Convert the DeletionTask to the Protobuf representation for storing in the + * state store and recovery. + * + * @return the protobuf representation of the DeletionTask. + */ + public abstract DeletionServiceDeleteTaskProto convertDeletionTaskToProto(); + + /** + * Add a dependent DeletionTask. + * + * If there is a task dependency between say tasks 1,2,3 such that + * task2 and task3 can be started only after task1 then we should define + * task2 and task3 as successor tasks for task1. + * Note:- Task dependency should be defined prior to calling delete. + * + * @param successorTask the DeletionTask the depends on this DeletionTask. + */ + public synchronized void addDeletionTaskDependency( + DeletionTask successorTask) { + if (successorTaskSet.add(successorTask)) { + successorTask.incrementAndGetPendingPredecessorTasks(); + } + } + + /** + * Increments and returns pending predecessor task count. + * + * @return the number of pending predecessor DeletionTasks. + */ + public int incrementAndGetPendingPredecessorTasks() { + return numberOfPendingPredecessorTasks.incrementAndGet(); + } + + /** + * Decrements and returns pending predecessor task count. + * + * @return the number of pending predecessor DeletionTasks. + */ + public int decrementAndGetPendingPredecessorTasks() { + return numberOfPendingPredecessorTasks.decrementAndGet(); + } + + /** + * Removes the DeletionTask from the state store and validates that successor + * tasks have been scheduled and completed. + * + * This is called when: + * 1) Current deletion task ran and finished. + * 2) When directly called by predecessor task if one of the + * dependent tasks of it has failed marking its success = false. + */ + synchronized void deletionTaskFinished() { + try { + NMStateStoreService stateStore = deletionService.getStateStore(); + stateStore.removeDeletionTask(taskId); + } catch (IOException e) { + LOG.error("Unable to remove deletion task " + taskId + + " from state store", e); + } + Iterator successorTaskI = this.successorTaskSet.iterator(); + while (successorTaskI.hasNext()) { + DeletionTask successorTask = successorTaskI.next(); + if (!success) { + successorTask.setSuccess(success); + } + int count = successorTask.decrementAndGetPendingPredecessorTasks(); + if (count == 0) { + if (successorTask.getSucess()) { + successorTask.deletionService.delete(successorTask); + } else { + successorTask.deletionTaskFinished(); + } + } + } + } + + /** + * Return the Protobuf builder with the base DeletionTask attributes. + * + * @return pre-populated Buidler with the base attributes. + */ + DeletionServiceDeleteTaskProto.Builder getBaseDeletionTaskProtoBuilder() { + DeletionServiceDeleteTaskProto.Builder builder = + DeletionServiceDeleteTaskProto.newBuilder(); + builder.setId(getTaskId()); + if (getUser() != null) { + builder.setUser(getUser()); + } + builder.setDeletionTime(System.currentTimeMillis() + + TimeUnit.MILLISECONDS.convert(getDeletionService().getDebugDelay(), + TimeUnit.SECONDS)); + for (DeletionTask successor : getSuccessorTasks()) { + builder.addSuccessorIds(successor.getTaskId()); + } + return builder; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java new file mode 100644 index 0000000000..676c71b365 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task; + +/** + * Available types of {@link DeletionTask}s. + */ +public enum DeletionTaskType { + FILE +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java new file mode 100644 index 0000000000..fd07f16c4c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task; + +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; + +import java.io.IOException; +import java.util.List; + +/** + * {@link DeletionTask} handling the removal of files (and directories). + */ +public class FileDeletionTask extends DeletionTask implements Runnable { + + private final Path subDir; + private final List baseDirs; + private static final FileContext lfs = getLfs(); + + private static FileContext getLfs() { + try { + return FileContext.getLocalFSFileContext(); + } catch (UnsupportedFileSystemException e) { + throw new RuntimeException(e); + } + } + + /** + * Construct a FileDeletionTask with the default INVALID_TASK_ID. + * + * @param deletionService the {@link DeletionService}. + * @param user the user deleting the file. + * @param subDir the subdirectory to delete. + * @param baseDirs the base directories containing the subdir. + */ + public FileDeletionTask(DeletionService deletionService, String user, + Path subDir, List baseDirs) { + this(INVALID_TASK_ID, deletionService, user, subDir, baseDirs); + } + + /** + * Construct a FileDeletionTask with the default INVALID_TASK_ID. + * + * @param taskId the ID of the task, if previously set. + * @param deletionService the {@link DeletionService}. + * @param user the user deleting the file. + * @param subDir the subdirectory to delete. + * @param baseDirs the base directories containing the subdir. + */ + public FileDeletionTask(int taskId, DeletionService deletionService, + String user, Path subDir, List baseDirs) { + super(taskId, deletionService, user, DeletionTaskType.FILE); + this.subDir = subDir; + this.baseDirs = baseDirs; + } + + /** + * Get the subdirectory to delete. + * + * @return the subDir for the FileDeletionTask. + */ + public Path getSubDir() { + return this.subDir; + } + + /** + * Get the base directories containing the subdirectory. + * + * @return the base directories for the FileDeletionTask. + */ + public List getBaseDirs() { + return this.baseDirs; + } + + /** + * Delete the specified file/directory as the specified user. + */ + @Override + public void run() { + if (LOG.isDebugEnabled()) { + String msg = String.format("Running DeletionTask : %s", toString()); + LOG.debug(msg); + } + boolean error = false; + if (null == getUser()) { + if (baseDirs == null || baseDirs.size() == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("NM deleting absolute path : " + subDir); + } + try { + lfs.delete(subDir, true); + } catch (IOException e) { + error = true; + LOG.warn("Failed to delete " + subDir); + } + } else { + for (Path baseDir : baseDirs) { + Path del = subDir == null? baseDir : new Path(baseDir, subDir); + if (LOG.isDebugEnabled()) { + LOG.debug("NM deleting path : " + del); + } + try { + lfs.delete(del, true); + } catch (IOException e) { + error = true; + LOG.warn("Failed to delete " + subDir); + } + } + } + } else { + try { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Deleting path: [" + subDir + "] as user: [" + getUser() + "]"); + } + if (baseDirs == null || baseDirs.size() == 0) { + getDeletionService().getContainerExecutor().deleteAsUser( + new DeletionAsUserContext.Builder() + .setUser(getUser()) + .setSubDir(subDir) + .build()); + } else { + getDeletionService().getContainerExecutor().deleteAsUser( + new DeletionAsUserContext.Builder() + .setUser(getUser()) + .setSubDir(subDir) + .setBasedirs(baseDirs.toArray(new Path[0])) + .build()); + } + } catch (IOException|InterruptedException e) { + error = true; + LOG.warn("Failed to delete as user " + getUser(), e); + } + } + if (error) { + setSuccess(!error); + } + deletionTaskFinished(); + } + + /** + * Convert the FileDeletionTask to a String representation. + * + * @return String representation of the FileDeletionTask. + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder("FileDeletionTask :"); + sb.append(" id : ").append(getTaskId()); + sb.append(" user : ").append(getUser()); + sb.append(" subDir : ").append( + subDir == null ? "null" : subDir.toString()); + sb.append(" baseDir : "); + if (baseDirs == null || baseDirs.size() == 0) { + sb.append("null"); + } else { + for (Path baseDir : baseDirs) { + sb.append(baseDir.toString()).append(','); + } + } + return sb.toString().trim(); + } + + /** + * Convert the FileDeletionTask to the Protobuf representation for storing + * in the state store and recovery. + * + * @return the protobuf representation of the FileDeletionTask. + */ + public DeletionServiceDeleteTaskProto convertDeletionTaskToProto() { + DeletionServiceDeleteTaskProto.Builder builder = + getBaseDeletionTaskProtoBuilder(); + builder.setTaskType(DeletionTaskType.FILE.name()); + if (getSubDir() != null) { + builder.setSubdir(getSubDir().toString()); + } + if (getBaseDirs() != null) { + for (Path dir : getBaseDirs()) { + builder.addBasedirs(dir.toString()); + } + } + return builder.build(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java new file mode 100644 index 0000000000..f1a3985648 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package containing DeletionTasks for use with the DeletionService. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index af34e92d6e..47e6a55fb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; @@ -113,9 +114,9 @@ public LocalResourcesTrackerImpl(String user, ApplicationId appId, this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager; if (this.useLocalCacheDirectoryManager) { directoryManagers = - new ConcurrentHashMap(); + new ConcurrentHashMap<>(); inProgressLocalResourcesMap = - new ConcurrentHashMap(); + new ConcurrentHashMap<>(); } this.conf = conf; this.stateStore = stateStore; @@ -393,7 +394,9 @@ public boolean remove(LocalizedResource rem, DeletionService delService) { return false; } else { // ResourceState is LOCALIZED or INIT if (ResourceState.LOCALIZED.equals(rsrc.getState())) { - delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath())); + FileDeletionTask deletionTask = new FileDeletionTask(delService, + getUser(), getPathToDelete(rsrc.getLocalPath()), null); + delService.delete(deletionTask); } removeResource(rem.getRequest()); LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache"); @@ -488,7 +491,9 @@ public Path getPathForLocalization(LocalResourceRequest req, LOG.warn("Directory " + uniquePath + " already exists, " + "try next one."); if (delService != null) { - delService.delete(getUser(), uniquePath); + FileDeletionTask deletionTask = new FileDeletionTask(delService, + getUser(), uniquePath, null); + delService.delete(deletionTask); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 663bad7892..5bc0da7cd1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -95,7 +95,6 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; -import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; @@ -113,6 +112,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; @@ -604,7 +604,9 @@ private void handleCleanupContainerResources( private void submitDirForDeletion(String userName, Path dir) { try { lfs.getFileStatus(dir); - delService.delete(userName, dir, new Path[] {}); + FileDeletionTask deletionTask = new FileDeletionTask(delService, userName, + dir, null); + delService.delete(deletionTask); } catch (UnsupportedFileSystemException ue) { LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue); } catch (IOException ie) { @@ -1234,10 +1236,13 @@ public void run() { event.getResource().unlock(); } if (!paths.isEmpty()) { - delService.delete(context.getUser(), - null, paths.toArray(new Path[paths.size()])); + FileDeletionTask deletionTask = new FileDeletionTask(delService, + context.getUser(), null, paths); + delService.delete(deletionTask); } - delService.delete(null, nmPrivateCTokensPath, new Path[] {}); + FileDeletionTask deletionTask = new FileDeletionTask(delService, null, + nmPrivateCTokensPath, null); + delService.delete(deletionTask); } } @@ -1456,7 +1461,9 @@ private void deleteAppLogDir(FileContext fs, DeletionService del, String appName = fileStatus.getPath().getName(); if (appName.matches("^application_\\d+_\\d+_DEL_\\d+$")) { LOG.info("delete app log dir," + appName); - del.delete(null, fileStatus.getPath()); + FileDeletionTask deletionTask = new FileDeletionTask(del, null, + fileStatus.getPath(), null); + del.delete(deletionTask); } } } @@ -1516,7 +1523,9 @@ private void deleteLocalDir(FileContext lfs, DeletionService del, || status.getPath().getName() .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) { - del.delete(null, status.getPath(), new Path[] {}); + FileDeletionTask deletionTask = new FileDeletionTask(del, null, + status.getPath(), null); + del.delete(deletionTask); } } catch (IOException ex) { // Do nothing, just give the warning @@ -1530,24 +1539,25 @@ private void deleteLocalDir(FileContext lfs, DeletionService del, private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del, Path userDirPath) throws IOException { RemoteIterator userDirStatus = lfs.listStatus(userDirPath); - FileDeletionTask dependentDeletionTask = - del.createFileDeletionTask(null, userDirPath, new Path[] {}); + FileDeletionTask dependentDeletionTask = new FileDeletionTask(del, null, + userDirPath, new ArrayList()); if (userDirStatus != null && userDirStatus.hasNext()) { List deletionTasks = new ArrayList(); while (userDirStatus.hasNext()) { FileStatus status = userDirStatus.next(); String owner = status.getOwner(); - FileDeletionTask deletionTask = - del.createFileDeletionTask(owner, null, - new Path[] { status.getPath() }); - deletionTask.addFileDeletionTaskDependency(dependentDeletionTask); + List pathList = new ArrayList<>(); + pathList.add(status.getPath()); + FileDeletionTask deletionTask = new FileDeletionTask(del, owner, null, + pathList); + deletionTask.addDeletionTaskDependency(dependentDeletionTask); deletionTasks.add(deletionTask); } for (FileDeletionTask task : deletionTasks) { - del.scheduleFileDeletionTask(task); + del.delete(task); } } else { - del.scheduleFileDeletionTask(dependentDeletionTask); + del.delete(dependentDeletionTask); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index f465534b66..0d9e686dc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -69,6 +69,8 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Times; @@ -258,19 +260,7 @@ private void uploadLogsForContainers(boolean appFinished) { return; } - if (UserGroupInformation.isSecurityEnabled()) { - Credentials systemCredentials = - context.getSystemCredentialsForApps().get(appId); - if (systemCredentials != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding new framework-token for " + appId - + " for log-aggregation: " + systemCredentials.getAllTokens() - + "; userUgi=" + userUgi); - } - // this will replace old token - userUgi.addCredentials(systemCredentials); - } - } + addCredentials(); // Create a set of Containers whose logs will be uploaded in this cycle. // It includes: @@ -332,9 +322,12 @@ private void uploadLogsForContainers(boolean appFinished) { finishedContainers.contains(container)); if (uploadedFilePathsInThisCycle.size() > 0) { uploadedLogsInThisCycle = true; - this.delService.delete(this.userUgi.getShortUserName(), null, - uploadedFilePathsInThisCycle - .toArray(new Path[uploadedFilePathsInThisCycle.size()])); + List uploadedFilePathsInThisCycleList = new ArrayList<>(); + uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle); + DeletionTask deletionTask = new FileDeletionTask(delService, + this.userUgi.getShortUserName(), null, + uploadedFilePathsInThisCycleList); + delService.delete(deletionTask); } // This container is finished, and all its logs have been uploaded, @@ -352,11 +345,7 @@ private void uploadLogsForContainers(boolean appFinished) { } long currentTime = System.currentTimeMillis(); - final Path renamedPath = this.rollingMonitorInterval <= 0 - ? remoteNodeLogFileForApp : new Path( - remoteNodeLogFileForApp.getParent(), - remoteNodeLogFileForApp.getName() + "_" - + currentTime); + final Path renamedPath = getRenamedPath(currentTime); final boolean rename = uploadedLogsInThisCycle; try { @@ -396,6 +385,28 @@ public Object run() throws Exception { } } + private Path getRenamedPath(long currentTime) { + return this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp + : new Path(remoteNodeLogFileForApp.getParent(), + remoteNodeLogFileForApp.getName() + "_" + currentTime); + } + + private void addCredentials() { + if (UserGroupInformation.isSecurityEnabled()) { + Credentials systemCredentials = + context.getSystemCredentialsForApps().get(appId); + if (systemCredentials != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding new framework-token for " + appId + + " for log-aggregation: " + systemCredentials.getAllTokens() + + "; userUgi=" + userUgi); + } + // this will replace old token + userUgi.addCredentials(systemCredentials); + } + } + } + @VisibleForTesting protected LogWriter createLogWriter() { return new LogWriter(); @@ -561,8 +572,11 @@ private void doAppLogAggregationPostCleanUp() { } if (localAppLogDirs.size() > 0) { - this.delService.delete(this.userUgi.getShortUserName(), null, - localAppLogDirs.toArray(new Path[localAppLogDirs.size()])); + List localAppLogDirsList = new ArrayList<>(); + localAppLogDirsList.addAll(localAppLogDirs); + DeletionTask deletionTask = new FileDeletionTask(delService, + this.userUgi.getShortUserName(), null, localAppLogDirsList); + this.delService.delete(deletionTask); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index 290174352f..9961748a38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; @@ -247,8 +248,10 @@ public void run() { new ApplicationEvent(this.applicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); if (localAppLogDirs.size() > 0) { - NonAggregatingLogHandler.this.delService.delete(user, null, - (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()])); + FileDeletionTask deletionTask = new FileDeletionTask( + NonAggregatingLogHandler.this.delService, user, null, + localAppLogDirs); + NonAggregatingLogHandler.this.delService.delete(deletionTask); } try { NonAggregatingLogHandler.this.stateStore.removeLogDeleter( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto index 7831711d85..7212953268 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto @@ -41,6 +41,7 @@ message DeletionServiceDeleteTaskProto { optional int64 deletionTime = 4; repeated string basedirs = 5; repeated int32 successorIds = 6; + optional string taskType = 7; } message LocalizedResourceProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java index 2e0bbe0832..87f4a1c2fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java @@ -33,13 +33,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.junit.AfterClass; import org.junit.Test; import org.mockito.Mockito; + public class TestDeletionService { private static final FileContext lfs = getLfs(); @@ -123,8 +124,9 @@ public void testAbsDelete() throws Exception { del.start(); try { for (Path p : dirs) { - del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", - p, null); + FileDeletionTask deletionTask = new FileDeletionTask(del, + (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, null); + del.delete(deletionTask); } int msecToWait = 20 * 1000; @@ -159,8 +161,10 @@ public void testRelativeDelete() throws Exception { del.start(); for (Path p : content) { assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p))); - del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", - p, baseDirs.toArray(new Path[4])); + FileDeletionTask deletionTask = new FileDeletionTask(del, + (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, + baseDirs); + del.delete(deletionTask); } int msecToWait = 20 * 1000; @@ -196,8 +200,9 @@ public void testNoDelete() throws Exception { del.init(conf); del.start(); for (Path p : dirs) { - del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, - null); + FileDeletionTask deletionTask = new FileDeletionTask(del, + (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, null); + del.delete(deletionTask); } int msecToWait = 20 * 1000; for (Path p : dirs) { @@ -220,7 +225,9 @@ public void testStopWithDelayedTasks() throws Exception { try { del.init(conf); del.start(); - del.delete("dingo", new Path("/does/not/exist")); + FileDeletionTask deletionTask = new FileDeletionTask(del, "dingo", + new Path("/does/not/exist"), null); + del.delete(deletionTask); } finally { del.stop(); } @@ -247,18 +254,20 @@ public void testFileDeletionTaskDependency() throws Exception { // first we will try to delete sub directories which are present. This // should then trigger parent directory to be deleted. List subDirs = buildDirs(r, dirs.get(0), 2); - + FileDeletionTask dependentDeletionTask = - del.createFileDeletionTask(null, dirs.get(0), new Path[] {}); + new FileDeletionTask(del, null, dirs.get(0), new ArrayList()); List deletionTasks = new ArrayList(); for (Path subDir : subDirs) { + List subDirList = new ArrayList<>(); + subDirList.add(subDir); FileDeletionTask deletionTask = - del.createFileDeletionTask(null, null, new Path[] { subDir }); - deletionTask.addFileDeletionTaskDependency(dependentDeletionTask); + new FileDeletionTask(del, null, dirs.get(0), subDirList); + deletionTask.addDeletionTaskDependency(dependentDeletionTask); deletionTasks.add(deletionTask); } for (FileDeletionTask task : deletionTasks) { - del.scheduleFileDeletionTask(task); + del.delete(task); } int msecToWait = 20 * 1000; @@ -274,19 +283,21 @@ public void testFileDeletionTaskDependency() throws Exception { subDirs = buildDirs(r, dirs.get(1), 2); subDirs.add(new Path(dirs.get(1), "absentFile")); - dependentDeletionTask = - del.createFileDeletionTask(null, dirs.get(1), new Path[] {}); + dependentDeletionTask = new FileDeletionTask(del, null, dirs.get(1), + new ArrayList()); deletionTasks = new ArrayList(); for (Path subDir : subDirs) { - FileDeletionTask deletionTask = - del.createFileDeletionTask(null, null, new Path[] { subDir }); - deletionTask.addFileDeletionTaskDependency(dependentDeletionTask); + List subDirList = new ArrayList<>(); + subDirList.add(subDir); + FileDeletionTask deletionTask = new FileDeletionTask(del, null, null, + subDirList); + deletionTask.addDeletionTaskDependency(dependentDeletionTask); deletionTasks.add(deletionTask); } // marking one of the tasks as a failure. deletionTasks.get(2).setSuccess(false); for (FileDeletionTask task : deletionTasks) { - del.scheduleFileDeletionTask(task); + del.delete(task); } msecToWait = 20 * 1000; @@ -327,8 +338,10 @@ public void testRecovery() throws Exception { del.start(); for (Path p : content) { assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p))); - del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", - p, baseDirs.toArray(new Path[4])); + FileDeletionTask deletionTask = new FileDeletionTask(del, + (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, + baseDirs); + del.delete(deletionTask); } // restart the deletion service @@ -341,8 +354,10 @@ public void testRecovery() throws Exception { // verify paths are still eventually deleted int msecToWait = 10 * 1000; for (Path p : baseDirs) { + System.out.println("TEST Basedir: " + p.getName()); for (Path q : content) { Path fp = new Path(p, q); + System.out.println("TEST Path: " + fp.toString()); while (msecToWait > 0 && lfs.util().exists(fp)) { Thread.sleep(100); msecToWait -= 100; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java index 5f9b8830a1..c1df56273e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.nodemanager; import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -28,6 +27,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -58,19 +58,17 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.NMTokenIdentifier; -import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.ArgumentMatcher; public class TestNodeManagerReboot { @@ -195,19 +193,18 @@ public Void run() throws YarnException, IOException { // restart the NodeManager restartNM(MAX_TRIES); checkNumOfLocalDirs(); - - verify(delService, times(1)).delete( - (String) isNull(), - argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR - + "_DEL_"))); - verify(delService, times(1)).delete((String) isNull(), - argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_"))); - verify(delService, times(1)).scheduleFileDeletionTask( - argThat(new FileDeletionInclude(user, null, - new String[] { destinationFile }))); - verify(delService, times(1)).scheduleFileDeletionTask( - argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE - + "_DEL_", new String[] {}))); + + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, null, + new Path(ResourceLocalizationService.NM_PRIVATE_DIR + "_DEL_"), null))); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, null, new Path(ContainerLocalizer.FILECACHE + "_DEL_"), + null))); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, user, null, Arrays.asList(new Path(destinationFile))))); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, null, new Path(ContainerLocalizer.USERCACHE + "_DEL_"), + new ArrayList()))); // restart the NodeManager again // this time usercache directory should be empty @@ -329,72 +326,4 @@ private YarnConfiguration createNMConfig() throws IOException { return conf; } } - - class PathInclude extends ArgumentMatcher { - - final String part; - - PathInclude(String part) { - this.part = part; - } - - @Override - public boolean matches(Object o) { - return ((Path) o).getName().indexOf(part) != -1; - } - } - - class FileDeletionInclude extends ArgumentMatcher { - final String user; - final String subDirIncludes; - final String[] baseDirIncludes; - - public FileDeletionInclude(String user, String subDirIncludes, - String [] baseDirIncludes) { - this.user = user; - this.subDirIncludes = subDirIncludes; - this.baseDirIncludes = baseDirIncludes; - } - - @Override - public boolean matches(Object o) { - FileDeletionTask fd = (FileDeletionTask)o; - if (fd.getUser() == null && user != null) { - return false; - } else if (fd.getUser() != null && user == null) { - return false; - } else if (fd.getUser() != null && user != null) { - return fd.getUser().equals(user); - } - if (!comparePaths(fd.getSubDir(), subDirIncludes)) { - return false; - } - if (baseDirIncludes == null && fd.getBaseDirs() != null) { - return false; - } else if (baseDirIncludes != null && fd.getBaseDirs() == null ) { - return false; - } else if (baseDirIncludes != null && fd.getBaseDirs() != null) { - if (baseDirIncludes.length != fd.getBaseDirs().size()) { - return false; - } - for (int i =0 ; i < baseDirIncludes.length; i++) { - if (!comparePaths(fd.getBaseDirs().get(i), baseDirIncludes[i])) { - return false; - } - } - } - return true; - } - - public boolean comparePaths(Path p1, String p2) { - if (p1 == null && p2 != null){ - return false; - } else if (p1 != null && p2 == null) { - return false; - } else if (p1 != null && p2 != null ){ - return p1.toUri().getPath().contains(p2.toString()); - } - return true; - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/TestNMProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/TestNMProtoUtils.java new file mode 100644 index 0000000000..69e01bc72d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/TestNMProtoUtils.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTaskType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +/** + * Test conversion to {@link DeletionTask}. + */ +public class TestNMProtoUtils { + + @Test + public void testConvertProtoToDeletionTask() throws Exception { + DeletionService deletionService = mock(DeletionService.class); + DeletionServiceDeleteTaskProto.Builder protoBuilder = + DeletionServiceDeleteTaskProto.newBuilder(); + int id = 0; + protoBuilder.setId(id); + DeletionServiceDeleteTaskProto proto = protoBuilder.build(); + DeletionTask deletionTask = + NMProtoUtils.convertProtoToDeletionTask(proto, deletionService); + assertEquals(DeletionTaskType.FILE, deletionTask.getDeletionTaskType()); + assertEquals(id, deletionTask.getTaskId()); + } + + @Test + public void testConvertProtoToFileDeletionTask() throws Exception { + DeletionService deletionService = mock(DeletionService.class); + int id = 0; + String user = "user"; + Path subdir = new Path("subdir"); + Path basedir = new Path("basedir"); + DeletionServiceDeleteTaskProto.Builder protoBuilder = + DeletionServiceDeleteTaskProto.newBuilder(); + protoBuilder + .setId(id) + .setUser("user") + .setSubdir(subdir.getName()) + .addBasedirs(basedir.getName()); + DeletionServiceDeleteTaskProto proto = protoBuilder.build(); + DeletionTask deletionTask = + NMProtoUtils.convertProtoToFileDeletionTask(proto, deletionService, id); + assertEquals(DeletionTaskType.FILE.name(), + deletionTask.getDeletionTaskType().name()); + assertEquals(id, deletionTask.getTaskId()); + assertEquals(subdir, ((FileDeletionTask) deletionTask).getSubDir()); + assertEquals(basedir, + ((FileDeletionTask) deletionTask).getBaseDirs().get(0)); + } + + @Test + public void testConvertProtoToDeletionTaskRecoveryInfo() throws Exception { + long delTime = System.currentTimeMillis(); + List successorTaskIds = Arrays.asList(1); + DeletionTask deletionTask = mock(DeletionTask.class); + DeletionTaskRecoveryInfo info = + new DeletionTaskRecoveryInfo(deletionTask, successorTaskIds, delTime); + assertEquals(deletionTask, info.getTask()); + assertEquals(successorTaskIds, info.getSuccessorTaskIds()); + assertEquals(delTime, info.getDeletionTimestamp()); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 2991c0cd7a..7980a80a2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; @@ -260,10 +261,10 @@ protected NMTokenIdentifier selectNMTokenIdentifier( protected DeletionService createDeletionService() { return new DeletionService(exec) { @Override - public void delete(String user, Path subDir, Path... baseDirs) { + public void delete(DeletionTask deletionTask) { // Don't do any deletions. - LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir - + ", baseDirs - " + Arrays.asList(baseDirs)); + LOG.info("Psuedo delete: user - " + user + + ", type - " + deletionTask.getDeletionTaskType()); }; }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionMatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionMatcher.java new file mode 100644 index 0000000000..faad456429 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionMatcher.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.mockito.ArgumentMatcher; + +import java.util.List; + +/** + * ArgumentMatcher to check the arguments of the {@link FileDeletionTask}. + */ +public class FileDeletionMatcher extends ArgumentMatcher { + + private final DeletionService delService; + private final String user; + private final Path subDirIncludes; + private final List baseDirIncludes; + + public FileDeletionMatcher(DeletionService delService, String user, + Path subDirIncludes, List baseDirIncludes) { + this.delService = delService; + this.user = user; + this.subDirIncludes = subDirIncludes; + this.baseDirIncludes = baseDirIncludes; + } + + @Override + public boolean matches(Object o) { + FileDeletionTask fd = (FileDeletionTask) o; + if (fd.getUser() == null && user != null) { + return false; + } else if (fd.getUser() != null && user == null) { + return false; + } else if (fd.getUser() != null && user != null) { + return fd.getUser().equals(user); + } + if (!comparePaths(fd.getSubDir(), subDirIncludes.getName())) { + return false; + } + if (baseDirIncludes == null && fd.getBaseDirs() != null) { + return false; + } else if (baseDirIncludes != null && fd.getBaseDirs() == null) { + return false; + } else if (baseDirIncludes != null && fd.getBaseDirs() != null) { + if (baseDirIncludes.size() != fd.getBaseDirs().size()) { + return false; + } + for (int i = 0; i < baseDirIncludes.size(); i++) { + if (!comparePaths(fd.getBaseDirs().get(i), + baseDirIncludes.get(i).getName())) { + return false; + } + } + } + return true; + } + + public boolean comparePaths(Path p1, String p2) { + if (p1 == null && p2 != null) { + return false; + } else if (p1 != null && p2 == null) { + return false; + } else if (p1 != null && p2 != null) { + return p1.toUri().getPath().contains(p2.toString()); + } + return true; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/TestFileDeletionTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/TestFileDeletionTask.java new file mode 100644 index 0000000000..fd2e4fbb89 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/TestFileDeletionTask.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +/** + * Test the attributes of the {@link FileDeletionTask} class. + */ +public class TestFileDeletionTask { + + private static final int ID = 0; + private static final String USER = "user"; + private static final Path SUBDIR = new Path("subdir"); + private static final Path BASEDIR = new Path("basedir"); + + private List baseDirs = new ArrayList<>(); + private DeletionService deletionService; + private FileDeletionTask deletionTask; + + @Before + public void setUp() throws Exception { + deletionService = mock(DeletionService.class); + baseDirs.add(BASEDIR); + deletionTask = new FileDeletionTask(ID, deletionService, USER, SUBDIR, + baseDirs); + } + + @After + public void tearDown() throws Exception { + baseDirs.clear(); + } + + @Test + public void testGetUser() throws Exception { + assertEquals(USER, deletionTask.getUser()); + } + + @Test + public void testGetSubDir() throws Exception { + assertEquals(SUBDIR, deletionTask.getSubDir()); + } + + @Test + public void testGetBaseDirs() throws Exception { + assertEquals(1, deletionTask.getBaseDirs().size()); + assertEquals(baseDirs, deletionTask.getBaseDirs()); + } + + @Test + public void testConvertDeletionTaskToProto() throws Exception { + DeletionServiceDeleteTaskProto proto = + deletionTask.convertDeletionTaskToProto(); + assertEquals(ID, proto.getId()); + assertEquals(USER, proto.getUser()); + assertEquals(SUBDIR, new Path(proto.getSubdir())); + assertEquals(BASEDIR, new Path(proto.getBasedirs(0))); + assertEquals(1, proto.getBasedirsCount()); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index 2874acb144..6cab5934e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; @@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; @@ -823,7 +825,8 @@ public void testGetPathForLocalization() throws Exception { Path rPath = tracker.getPathForLocalization(req1, base_path, delService); Assert.assertFalse(lfs.util().exists(rPath)); - verify(delService, times(1)).delete(eq(user), eq(conflictPath)); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, user, conflictPath, null))); } finally { lfs.delete(base_path, true); if (dispatcher != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 89cbeb4c91..d863c6ad4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -31,7 +31,6 @@ import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; -import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -68,6 +67,7 @@ import org.apache.hadoop.fs.Options; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; import org.junit.Assert; import org.apache.commons.io.FileUtils; @@ -1066,7 +1066,8 @@ public boolean matches(Object o) { verify(containerBus, times(3)).handle(argThat(matchesContainerLoc)); // Verify deletion of localization token. - verify(delService).delete((String)isNull(), eq(localizationTokenPath)); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, null, localizationTokenPath, null))); } finally { spyService.stop(); dispatcher.stop(); @@ -1340,8 +1341,8 @@ public boolean matches(Object o) { Thread.sleep(50); } // Verify if downloading resources were submitted for deletion. - verify(delService).delete(eq(user), (Path) eq(null), - argThat(new DownloadingPathsMatcher(paths))); + verify(delService, times(2)).delete(argThat(new FileDeletionMatcher( + delService, user, null, new ArrayList<>(paths)))); LocalResourcesTracker tracker = spyService.getLocalResourcesTracker( LocalResourceVisibility.PRIVATE, "user0", appId); @@ -2753,15 +2754,19 @@ public void testFailedDirsResourceRelease() throws Exception { for (int i = 0; i < containerLocalDirs.size(); ++i) { if (i == 2) { try { - verify(delService).delete(user, containerLocalDirs.get(i)); - verify(delService).delete(null, nmLocalContainerDirs.get(i)); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, user, containerLocalDirs.get(i), null))); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, null, nmLocalContainerDirs.get(i), null))); Assert.fail("deletion attempts for invalid dirs"); } catch (Throwable e) { continue; } } else { - verify(delService).delete(user, containerLocalDirs.get(i)); - verify(delService).delete(null, nmLocalContainerDirs.get(i)); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, user, containerLocalDirs.get(i), null))); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, null, nmLocalContainerDirs.get(i), null))); } } @@ -2802,15 +2807,19 @@ public boolean matches(Object o) { for (int i = 0; i < containerLocalDirs.size(); ++i) { if (i == 3) { try { - verify(delService).delete(user, containerLocalDirs.get(i)); - verify(delService).delete(null, nmLocalContainerDirs.get(i)); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, user, containerLocalDirs.get(i), null))); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, null, nmLocalContainerDirs.get(i), null))); Assert.fail("deletion attempts for invalid dirs"); } catch (Throwable e) { continue; } } else { - verify(delService).delete(user, appLocalDirs.get(i)); - verify(delService).delete(null, nmLocalAppDirs.get(i)); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, user, containerLocalDirs.get(i), null))); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, null, nmLocalContainerDirs.get(i), null))); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java index 097146b1e9..b4bd9d7e88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -42,17 +42,16 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -311,16 +310,18 @@ private static DeletionService createDeletionServiceWithExpectedFile2Delete( @Override public Void answer(InvocationOnMock invocationOnMock) throws Throwable { Set paths = new HashSet<>(); - Object[] args = invocationOnMock.getArguments(); - for(int i = 2; i < args.length; i++) { - Path path = (Path) args[i]; - paths.add(path.toUri().getRawPath()); + Object[] tasks = invocationOnMock.getArguments(); + for(int i = 0; i < tasks.length; i++) { + FileDeletionTask task = (FileDeletionTask) tasks[i]; + for (Path path: task.getBaseDirs()) { + paths.add(path.toUri().getRawPath()); + } } verifyFilesToDelete(expectedPathsForDeletion, paths); return null; } }).doNothing().when(deletionServiceWithExpectedFiles).delete( - any(String.class), any(Path.class), Matchers.anyVararg()); + any(FileDeletionTask.class)); return deletionServiceWithExpectedFiles; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index bc1b4b0963..37fe77ac7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -120,6 +120,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; @@ -218,8 +220,10 @@ private void verifyLocalFileDeletion( // ensure filesystems were closed verify(logAggregationService).closeFileSystems( any(UserGroupInformation.class)); - verify(delSrvc).delete(eq(user), eq((Path) null), - eq(new Path(app1LogDir.getAbsolutePath()))); + List dirList = new ArrayList<>(); + dirList.add(new Path(app1LogDir.toURI())); + verify(delSrvc, times(2)).delete(argThat(new FileDeletionMatcher( + delSrvc, user, null, dirList))); String containerIdStr = container11.toString(); File containerLogDir = new File(app1LogDir, containerIdStr); @@ -333,7 +337,9 @@ public void testNoLogsUploadedOnAppFinish() throws Exception { logAggregationService.stop(); delSrvc.stop(); // Aggregated logs should not be deleted if not uploaded. - verify(delSrvc, times(0)).delete(user, null); + FileDeletionTask deletionTask = new FileDeletionTask(delSrvc, user, null, + null); + verify(delSrvc, times(0)).delete(deletionTask); } @Test @@ -815,8 +821,9 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() assertEquals(0, logAggregationService.getNumAggregators()); // local log dir shouldn't be deleted given log aggregation cannot // continue due to aggregated log dir creation failure on remoteFS. - verify(spyDelSrvc, never()).delete(eq(user), any(Path.class), - Mockito.anyVararg()); + FileDeletionTask deletionTask = new FileDeletionTask(spyDelSrvc, user, + null, null); + verify(spyDelSrvc, never()).delete(deletionTask); verify(logAggregationService).closeFileSystems( any(UserGroupInformation.class)); // make sure local log dir is not deleted in case log aggregation diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java index ec3757e680..7a4ea88760 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java @@ -22,12 +22,14 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.File; @@ -36,6 +38,7 @@ import java.io.NotSerializableException; import java.io.ObjectInputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.HashMap; @@ -66,6 +69,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; @@ -531,8 +535,8 @@ static void testDeletionServiceCall(DeletionService delService, String user, boolean matched = false; while (!matched && System.currentTimeMillis() < verifyStartTime + timeout) { try { - verify(delService).delete(eq(user), (Path) eq(null), - Mockito.argThat(new DeletePathsMatcher(matchPaths))); + verify(delService, times(1)).delete(argThat(new FileDeletionMatcher( + delService, user, null, Arrays.asList(matchPaths)))); matched = true; } catch (WantedButNotInvoked e) { notInvokedException = e;