diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 5af4eaa2de..2272e3ed91 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.Logger; @@ -133,8 +135,7 @@ public abstract class AMSimulator extends TaskRunner.Task { * register with RM */ @Override - public void firstStep() - throws YarnException, IOException, InterruptedException { + public void firstStep() throws Exception { simulateStartTimeMS = System.currentTimeMillis() - SLSRunner.getRunner().getStartTimeMS(); @@ -149,8 +150,7 @@ public abstract class AMSimulator extends TaskRunner.Task { } @Override - public void middleStep() - throws InterruptedException, YarnException, IOException { + public void middleStep() throws Exception { // process responses in the queue processResponseQueue(); @@ -162,7 +162,7 @@ public abstract class AMSimulator extends TaskRunner.Task { } @Override - public void lastStep() { + public void lastStep() throws Exception { LOG.info(MessageFormat.format("Application {0} is shutting down.", appId)); // unregister tracking if (isTracked) { @@ -173,26 +173,19 @@ public abstract class AMSimulator extends TaskRunner.Task { .newRecordInstance(FinishApplicationMasterRequest.class); finishAMRequest.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); - try { - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - Token token = - rm.getRMContext().getRMApps().get(appAttemptId.getApplicationId()) - .getRMAppAttempt(appAttemptId).getAMRMToken(); - ugi.addTokenIdentifier(token.decodeIdentifier()); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - rm.getApplicationMasterService() - .finishApplicationMaster(finishAMRequest); - return null; - } - }); - } catch (IOException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + Token token = rm.getRMContext().getRMApps().get(appId) + .getRMAppAttempt(appAttemptId).getAMRMToken(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + rm.getApplicationMasterService() + .finishApplicationMaster(finishAMRequest); + return null; + } + }); simulateFinishTimeMS = System.currentTimeMillis() - SLSRunner.getRunner().getStartTimeMS(); @@ -230,11 +223,9 @@ public abstract class AMSimulator extends TaskRunner.Task { return createAllocateRequest(ask, new ArrayList()); } - protected abstract void processResponseQueue() - throws InterruptedException, YarnException, IOException; + protected abstract void processResponseQueue() throws Exception; - protected abstract void sendContainerRequest() - throws YarnException, IOException, InterruptedException; + protected abstract void sendContainerRequest() throws Exception; protected abstract void checkStop(); @@ -280,11 +271,18 @@ public abstract class AMSimulator extends TaskRunner.Task { // waiting until application ACCEPTED RMApp app = rm.getRMContext().getRMApps().get(appId); while(app.getState() != RMAppState.ACCEPTED) { - Thread.sleep(50); + Thread.sleep(10); } - appAttemptId = rm.getRMContext().getRMApps().get(appId) - .getCurrentAppAttempt().getAppAttemptId(); + // Waiting until application attempt reach LAUNCHED + // "Unmanaged AM must register after AM attempt reaches LAUNCHED state" + this.appAttemptId = rm.getRMContext().getRMApps().get(appId) + .getCurrentAppAttempt().getAppAttemptId(); + RMAppAttempt rmAppAttempt = rm.getRMContext().getRMApps().get(appId) + .getCurrentAppAttempt(); + while (rmAppAttempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED) { + Thread.sleep(10); + } } private void registerAM() @@ -297,10 +295,9 @@ public abstract class AMSimulator extends TaskRunner.Task { amRegisterRequest.setTrackingUrl("localhost:1000"); UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - Token token = - rm.getRMContext().getRMApps().get(appAttemptId.getApplicationId()) - .getRMAppAttempt(appAttemptId).getAMRMToken(); + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + Token token = rm.getRMContext().getRMApps().get(appId) + .getRMAppAttempt(appAttemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); ugi.doAs( diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index d24510ba6f..fb702059ad 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -145,8 +145,7 @@ public class MRAMSimulator extends AMSimulator { } @Override - public void firstStep() - throws YarnException, IOException, InterruptedException { + public void firstStep() throws Exception { super.firstStep(); requestAMContainer(); @@ -390,7 +389,7 @@ public class MRAMSimulator extends AMSimulator { } @Override - public void lastStep() { + public void lastStep() throws Exception { super.lastStep(); // clear data structures diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java index 4112685e15..0947ba8a18 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -107,12 +108,12 @@ public class NMSimulator extends TaskRunner.Task { } @Override - public void firstStep() throws YarnException, IOException { + public void firstStep() { // do nothing } @Override - public void middleStep() { + public void middleStep() throws Exception { // we check the lifetime for each running containers ContainerSimulator cs = null; synchronized(completedContainerList) { @@ -136,37 +137,31 @@ public class NMSimulator extends TaskRunner.Task { ns.setResponseId(RESPONSE_ID ++); ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0)); beatRequest.setNodeStatus(ns); - try { - NodeHeartbeatResponse beatResponse = - rm.getResourceTrackerService().nodeHeartbeat(beatRequest); - if (! beatResponse.getContainersToCleanup().isEmpty()) { - // remove from queue - synchronized(releasedContainerList) { - for (ContainerId containerId : beatResponse.getContainersToCleanup()){ - if (amContainerList.contains(containerId)) { - // AM container (not killed?, only release) - synchronized(amContainerList) { - amContainerList.remove(containerId); - } - LOG.debug(MessageFormat.format("NodeManager {0} releases " + - "an AM ({1}).", node.getNodeID(), containerId)); - } else { - cs = runningContainers.remove(containerId); - containerQueue.remove(cs); - releasedContainerList.add(containerId); - LOG.debug(MessageFormat.format("NodeManager {0} releases a " + - "container ({1}).", node.getNodeID(), containerId)); + NodeHeartbeatResponse beatResponse = + rm.getResourceTrackerService().nodeHeartbeat(beatRequest); + if (! beatResponse.getContainersToCleanup().isEmpty()) { + // remove from queue + synchronized(releasedContainerList) { + for (ContainerId containerId : beatResponse.getContainersToCleanup()){ + if (amContainerList.contains(containerId)) { + // AM container (not killed?, only release) + synchronized(amContainerList) { + amContainerList.remove(containerId); } + LOG.debug(MessageFormat.format("NodeManager {0} releases " + + "an AM ({1}).", node.getNodeID(), containerId)); + } else { + cs = runningContainers.remove(containerId); + containerQueue.remove(cs); + releasedContainerList.add(containerId); + LOG.debug(MessageFormat.format("NodeManager {0} releases a " + + "container ({1}).", node.getNodeID(), containerId)); } } } - if (beatResponse.getNodeAction() == NodeAction.SHUTDOWN) { - lastStep(); - } - } catch (YarnException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); + } + if (beatResponse.getNodeAction() == NodeAction.SHUTDOWN) { + lastStep(); } } @@ -262,4 +257,19 @@ public class NMSimulator extends TaskRunner.Task { completedContainerList.add(containerId); } } + + @VisibleForTesting + Map getRunningContainers() { + return runningContainers; + } + + @VisibleForTesting + List getAMContainers() { + return amContainerList; + } + + @VisibleForTesting + List getCompletedContainers() { + return completedContainerList; + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 0bd0c87d2d..6ccae98bd8 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; @@ -101,7 +102,6 @@ final public class ResourceSchedulerWrapper private static final String EOL = System.getProperty("line.separator"); private static final int SAMPLING_SIZE = 60; private ScheduledExecutorService pool; - private RMContext rmContext; // counters for scheduler allocate/handle operations private Counter schedulerAllocateCounter; private Counter schedulerHandleCounter; @@ -576,7 +576,7 @@ final public class ResourceSchedulerWrapper new Gauge() { @Override public Integer getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { + if (scheduler == null || scheduler.getRootQueueMetrics() == null) { return 0; } else { return scheduler.getRootQueueMetrics().getAppsRunning(); @@ -723,17 +723,18 @@ final public class ResourceSchedulerWrapper public void addAMRuntime(ApplicationId appId, long traceStartTimeMS, long traceEndTimeMS, long simulateStartTimeMS, long simulateEndTimeMS) { - - try { - // write job runtime information - StringBuilder sb = new StringBuilder(); - sb.append(appId).append(",").append(traceStartTimeMS).append(",") - .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) - .append(",").append(simulateEndTimeMS); - jobRuntimeLogBW.write(sb.toString() + EOL); - jobRuntimeLogBW.flush(); - } catch (IOException e) { - e.printStackTrace(); + if (metricsON) { + try { + // write job runtime information + StringBuilder sb = new StringBuilder(); + sb.append(appId).append(",").append(traceStartTimeMS).append(",") + .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) + .append(",").append(simulateEndTimeMS); + jobRuntimeLogBW.write(sb.toString() + EOL); + jobRuntimeLogBW.flush(); + } catch (IOException e) { + e.printStackTrace(); + } } } @@ -919,4 +920,17 @@ final public class ResourceSchedulerWrapper public Resource getClusterResource() { return null; } + + @Override + public synchronized List getTransferredContainers( + ApplicationAttemptId currentAttempt) { + return new ArrayList(); + } + + @Override + public Map> + getSchedulerApplications() { + return new HashMap>(); + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 44a872198d..06addfb28f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -729,16 +729,18 @@ public class SLSCapacityScheduler extends CapacityScheduler implements long traceStartTimeMS, long traceEndTimeMS, long simulateStartTimeMS, long simulateEndTimeMS) { - try { - // write job runtime information - StringBuilder sb = new StringBuilder(); - sb.append(appId).append(",").append(traceStartTimeMS).append(",") - .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) - .append(",").append(simulateEndTimeMS); - jobRuntimeLogBW.write(sb.toString() + EOL); - jobRuntimeLogBW.flush(); - } catch (IOException e) { - e.printStackTrace(); + if (metricsON) { + try { + // write job runtime information + StringBuilder sb = new StringBuilder(); + sb.append(appId).append(",").append(traceStartTimeMS).append(",") + .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) + .append(",").append(simulateEndTimeMS); + jobRuntimeLogBW.write(sb.toString() + EOL); + jobRuntimeLogBW.flush(); + } catch (IOException e) { + e.printStackTrace(); + } } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java index c936dd9318..d35290428c 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java @@ -99,12 +99,10 @@ public class TaskRunner { } else { lastStep(); } - } catch (YarnException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } catch (InterruptedException e) { + } catch (Exception e) { e.printStackTrace(); + Thread.getDefaultUncaughtExceptionHandler() + .uncaughtException(Thread.currentThread(), e); } } @@ -124,13 +122,11 @@ public class TaskRunner { } - public abstract void firstStep() - throws YarnException, IOException, InterruptedException; + public abstract void firstStep() throws Exception; - public abstract void middleStep() - throws YarnException, InterruptedException, IOException; + public abstract void middleStep() throws Exception; - public abstract void lastStep() throws YarnException; + public abstract void lastStep() throws Exception; public void setEndTime(long et) { endTime = et; diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java index b05972734f..9da8ef34a2 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java @@ -18,10 +18,13 @@ package org.apache.hadoop.yarn.sls; -import org.apache.commons.io.FileUtils; +import org.junit.Assert; import org.junit.Test; import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.UUID; public class TestSLSRunner { @@ -30,6 +33,15 @@ public class TestSLSRunner { @SuppressWarnings("all") public void testSimulatorRunning() throws Exception { File tempDir = new File("target", UUID.randomUUID().toString()); + final List exceptionList = + Collections.synchronizedList(new ArrayList()); + + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + exceptionList.add(e); + } + }); // start the simulator File slsOutputDir = new File(tempDir.getAbsolutePath() + "/slsoutput/"); @@ -38,8 +50,20 @@ public class TestSLSRunner { "-output", slsOutputDir.getAbsolutePath()}; SLSRunner.main(args); - // wait for 45 seconds before stop - Thread.sleep(45 * 1000); + // wait for 20 seconds before stop + int count = 20; + while (count >= 0) { + Thread.sleep(1000); + + if (! exceptionList.isEmpty()) { + SLSRunner.getRunner().stop(); + Assert.fail("TestSLSRunner catched exception from child thread " + + "(TaskRunner.Task): " + exceptionList.get(0).getMessage()); + break; + } + count--; + } + SLSRunner.getRunner().stop(); } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e6fbea9350..3d3429a674 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -444,6 +444,9 @@ Release 2.5.0 - UNRELEASED YARN-2335. Annotate all hadoop-sls APIs as @Private. (Wei Yan via kasha) + YARN-1726. ResourceSchedulerWrapper broken due to AbstractYarnScheduler. + (Wei Yan via kasha) + Release 2.4.1 - 2014-06-23 INCOMPATIBLE CHANGES