diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 8504b9d9a6..e85973258a 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -33,6 +33,9 @@ import java.util.Set; import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.ObjectMapper; @@ -85,6 +88,7 @@ import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,9 +100,10 @@ public class SLSRunner extends Configured implements Tool { private static TaskRunner runner = new TaskRunner(); private String[] inputTraces; private Map queueAppNumMap; + private int poolSize; // NM simulator - private HashMap nmMap; + private Map nmMap; private Resource nodeManagerResource; private String nodeFile; @@ -158,7 +163,7 @@ public void setConf(Configuration conf) { } private void init(Configuration tempConf) throws ClassNotFoundException { - nmMap = new HashMap<>(); + nmMap = new ConcurrentHashMap<>(); queueAppNumMap = new HashMap<>(); amMap = new ConcurrentHashMap<>(); amClassMap = new HashMap<>(); @@ -167,7 +172,7 @@ private void init(Configuration tempConf) throws ClassNotFoundException { setConf(tempConf); // runner - int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, + poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSRunner.runner.setQueueSize(poolSize); // map @@ -283,7 +288,8 @@ protected ApplicationMasterLauncher createAMLauncher() { rm.start(); } - private void startNM() throws YarnException, IOException { + private void startNM() throws YarnException, IOException, + InterruptedException { // nm configuration int heartbeatInterval = getConf().getInt( SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, @@ -333,21 +339,36 @@ private void startNM() throws YarnException, IOException { // create NM simulators Random random = new Random(); - Set rackSet = new HashSet(); + Set rackSet = new ConcurrentHashSet<>(); + int threadPoolSize = Math.max(poolSize, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + ExecutorService executorService = Executors. + newFixedThreadPool(threadPoolSize); for (Map.Entry entry : nodeResourceMap.entrySet()) { - // we randomize the heartbeat start time from zero to 1 interval - NMSimulator nm = new NMSimulator(); - Resource nmResource = nodeManagerResource; - String hostName = entry.getKey(); - if (entry.getValue() != null) { - nmResource = entry.getValue(); - } - nm.init(hostName, nmResource, random.nextInt(heartbeatInterval), - heartbeatInterval, rm, resourceUtilizationRatio); - nmMap.put(nm.getNode().getNodeID(), nm); - runner.schedule(nm); - rackSet.add(nm.getNode().getRackName()); + executorService.submit(new Runnable() { + @Override public void run() { + try { + // we randomize the heartbeat start time from zero to 1 interval + NMSimulator nm = new NMSimulator(); + Resource nmResource = nodeManagerResource; + String hostName = entry.getKey(); + if (entry.getValue() != null) { + nmResource = entry.getValue(); + } + nm.init(hostName, nmResource, + random.nextInt(heartbeatInterval), + heartbeatInterval, rm, resourceUtilizationRatio); + nmMap.put(nm.getNode().getNodeID(), nm); + runner.schedule(nm); + rackSet.add(nm.getNode().getRackName()); + } catch (IOException | YarnException e) { + LOG.error("Got an error while adding node", e); + } + } + }); } + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.MINUTES); numRacks = rackSet.size(); numNMs = nmMap.size(); } @@ -839,7 +860,7 @@ private void printSimulationInfo() { (long)(Math.ceil(maxRuntime / 1000.0))); } - public HashMap getNmMap() { + public Map getNmMap() { return nmMap; }