diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index dce0da1a99..5257950e85 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -436,6 +436,9 @@ Release 0.23.0 - Unreleased virtual, allowing for a ratio between the two to be configurable. (todd via acmurthy) + MAPREDUCE-2986. Fixed MiniYARNCluster to support multiple NodeManagers. + (Anupam Seth via vinodkv) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java index 49a63db44b..2308e6381f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java @@ -58,7 +58,11 @@ public class MiniMRYarnCluster extends MiniYARNCluster { private JobHistoryServerWrapper historyServerWrapper; public MiniMRYarnCluster(String testName) { - super(testName); + this(testName, 1); + } + + public MiniMRYarnCluster(String testName, int noOfNMs) { + super(testName, noOfNMs); //TODO: add the history server historyServerWrapper = new JobHistoryServerWrapper(); addService(historyServerWrapper); @@ -80,7 +84,7 @@ public void init(Configuration conf) { Service.class); // Non-standard shuffle port - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 8083); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, DefaultContainerExecutor.class, ContainerExecutor.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index d26a441c7a..562e865410 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -102,7 +102,7 @@ public static void setup() throws IOException { } if (mrCluster == null) { - mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName()); + mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3); Configuration conf = new Configuration(); mrCluster.init(conf); mrCluster.start(); @@ -322,7 +322,7 @@ protected Job runFailingMapperJob() return job; } -//@Test + //@Test public void testSleepJobWithSecurityOn() throws IOException, InterruptedException, ClassNotFoundException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 44c41aa118..d39ed567d1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -249,9 +249,14 @@ public synchronized void init(Configuration conf) { public synchronized void start() { Configuration conf = getConfig(); ServerBootstrap bootstrap = new ServerBootstrap(selector); - bootstrap.setPipelineFactory(new HttpPipelineFactory(conf)); + HttpPipelineFactory pipelineFact = new HttpPipelineFactory(conf); + bootstrap.setPipelineFactory(pipelineFact); port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); - accepted.add(bootstrap.bind(new InetSocketAddress(port))); + Channel ch = bootstrap.bind(new InetSocketAddress(port)); + accepted.add(ch); + port = ((InetSocketAddress)ch.getLocalAddress()).getPort(); + conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port)); + pipelineFact.SHUFFLE.setPort(port); LOG.info(getName() + " listening on port " + port); super.start(); } @@ -304,13 +309,17 @@ class Shuffle extends SimpleChannelUpstreamHandler { private final IndexCache indexCache; private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); - private final int port; + private int port; public Shuffle(Configuration conf) { this.conf = conf; indexCache = new IndexCache(new JobConf(conf)); this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); } + + public void setPort(int port) { + this.port = port; + } private List splitMaps(List mapq) { if (null == mapq) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index a525509771..f3a3a224fe 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -89,8 +89,9 @@ public void startLocalizer(Path nmPrivateContainerTokensPath, String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); Path tokenDst = new Path(appStorageDir, tokenFn); lfs.util().copy(nmPrivateContainerTokensPath, tokenDst); + LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst); lfs.setWorkingDirectory(appStorageDir); - + LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory()); // TODO: DO it over RPC for maintaining similarity? localizer.runLocalization(nmAddr); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index ff55625b82..1b0fe8f538 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -235,8 +235,15 @@ public void start() { cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher), cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS); server = createServer(); - LOG.info("Localizer started on port " + server.getPort()); server.start(); + String host = getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS) + .split(":")[0]; + getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, host + ":" + + server.getPort()); + localizationServerAddress = NetUtils.createSocketAddr( + getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS, + YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS)); + LOG.info("Localizer started on port " + server.getPort()); super.start(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 741a01a4dd..fe7710bacb 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -147,7 +147,7 @@ public void testLocalizationInit() throws Exception { @Test @SuppressWarnings("unchecked") // mocked generics public void testResourceRelease() throws Exception { - Configuration conf = new Configuration(); + Configuration conf = new YarnConfiguration(); AbstractFileSystem spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); final FileContext lfs = FileContext.getFileContext(spylfs, conf); @@ -331,7 +331,7 @@ public void testResourceRelease() throws Exception { @Test @SuppressWarnings("unchecked") // mocked generics public void testLocalizationHeartbeat() throws Exception { - Configuration conf = new Configuration(); + Configuration conf = new YarnConfiguration(); AbstractFileSystem spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); final FileContext lfs = FileContext.getFileContext(spylfs, conf); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 9890b5fe84..175542cc2a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -292,6 +292,7 @@ public AllocateResponse allocate(AllocateRequest request) public void registerAppAttempt(ApplicationAttemptId attemptId) { AMResponse response = recordFactory.newRecordInstance(AMResponse.class); response.setResponseId(0); + LOG.info("Registering " + attemptId); responseMap.put(attemptId, response); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 6ec44e6e51..53a891366f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.NodeHealthCheckerService; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.CompositeService; +import org.apache.hadoop.yarn.service.Service.STATE; public class MiniYARNCluster extends CompositeService { @@ -60,15 +62,19 @@ public class MiniYARNCluster extends CompositeService { DefaultMetricsSystem.setMiniClusterMode(true); } - private NodeManager nodeManager; + private NodeManager[] nodeManagers; private ResourceManager resourceManager; private ResourceManagerWrapper resourceManagerWrapper; - private NodeManagerWrapper nodeManagerWrapper; private File testWorkDir; public MiniYARNCluster(String testName) { + //default number of nodeManagers = 1 + this(testName, 1); + } + + public MiniYARNCluster(String testName, int noOfNodeManagers) { super(testName); this.testWorkDir = new File("target", testName); try { @@ -80,8 +86,11 @@ public MiniYARNCluster(String testName) { } resourceManagerWrapper = new ResourceManagerWrapper(); addService(resourceManagerWrapper); - nodeManagerWrapper = new NodeManagerWrapper(); - addService(nodeManagerWrapper); + nodeManagers = new CustomNodeManager[noOfNodeManagers]; + for(int index = 0; index < noOfNodeManagers; index++) { + addService(new NodeManagerWrapper(index)); + nodeManagers[index] = new CustomNodeManager(); + } } public File getTestWorkDir() { @@ -92,10 +101,10 @@ public ResourceManager getResourceManager() { return this.resourceManager; } - public NodeManager getNodeManager() { - return this.nodeManager; + public NodeManager getNodeManager(int i) { + return this.nodeManagers[i]; } - + private class ResourceManagerWrapper extends AbstractService { public ResourceManagerWrapper() { super(ResourceManagerWrapper.class.getName()); @@ -145,106 +154,60 @@ public synchronized void stop() { } private class NodeManagerWrapper extends AbstractService { - public NodeManagerWrapper() { - super(NodeManagerWrapper.class.getName()); + int index = 0; + + public NodeManagerWrapper(int i) { + super(NodeManagerWrapper.class.getName() + "_" + i); + index = i; } + public synchronized void init(Configuration conf) { + Configuration config = new Configuration(conf); + super.init(config); + } + public synchronized void start() { try { - File localDir = - new File(testWorkDir, MiniYARNCluster.this.getName() + "-localDir"); + File localDir = new File(testWorkDir, MiniYARNCluster.this.getName() + + "-localDir-nm-" + index); localDir.mkdir(); LOG.info("Created localDir in " + localDir.getAbsolutePath()); - getConfig().set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath()); + getConfig().set(YarnConfiguration.NM_LOCAL_DIRS, + localDir.getAbsolutePath()); File logDir = new File(testWorkDir, MiniYARNCluster.this.getName() - + "-logDir"); + + "-logDir-nm-" + index); File remoteLogDir = - new File(testWorkDir, MiniYARNCluster.this.getName() - + "-remoteLogDir"); + new File(testWorkDir, MiniYARNCluster.this.getName() + + "-remoteLogDir-nm-" + index); logDir.mkdir(); remoteLogDir.mkdir(); LOG.info("Created logDir in " + logDir.getAbsolutePath()); - getConfig().set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath()); + getConfig().set(YarnConfiguration.NM_LOG_DIRS, + logDir.getAbsolutePath()); getConfig().set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - remoteLogDir.getAbsolutePath()); - getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024); // By default AM + 2 containers - nodeManager = new NodeManager() { - - @Override - protected void doSecureLogin() throws IOException { - // Don't try to login using keytab in the testcase. - }; - - @Override - protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker, - ContainerTokenSecretManager containerTokenSecretManager) { - return new NodeStatusUpdaterImpl(context, dispatcher, - healthChecker, metrics, containerTokenSecretManager) { - @Override - protected ResourceTracker getRMClient() { - final ResourceTrackerService rt = resourceManager - .getResourceTrackerService(); - final RecordFactory recordFactory = - RecordFactoryProvider.getRecordFactory(null); - - // For in-process communication without RPC - return new ResourceTracker() { - - @Override - public NodeHeartbeatResponse nodeHeartbeat( - NodeHeartbeatRequest request) throws YarnRemoteException { - NodeHeartbeatResponse response = recordFactory.newRecordInstance( - NodeHeartbeatResponse.class); - try { - response.setHeartbeatResponse(rt.nodeHeartbeat(request) - .getHeartbeatResponse()); - } catch (IOException ioe) { - LOG.info("Exception in heartbeat from node " + - request.getNodeStatus().getNodeId(), ioe); - throw RPCUtil.getRemoteException(ioe); - } - return response; - } - - @Override - public RegisterNodeManagerResponse registerNodeManager( - RegisterNodeManagerRequest request) - throws YarnRemoteException { - RegisterNodeManagerResponse response = recordFactory.newRecordInstance( - RegisterNodeManagerResponse.class); - try { - response.setRegistrationResponse(rt - .registerNodeManager(request) - .getRegistrationResponse()); - } catch (IOException ioe) { - LOG.info("Exception in node registration from " - + request.getNodeId().toString(), ioe); - throw RPCUtil.getRemoteException(ioe); - } - return response; - } - }; - }; - }; - }; - }; - nodeManager.init(getConfig()); + remoteLogDir.getAbsolutePath()); + // By default AM + 2 containers + getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024); + getConfig().set(YarnConfiguration.NM_ADDRESS, "0.0.0.0:0"); + getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0"); + getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS, "0.0.0.0:0"); + LOG.info("Starting NM: " + index); + nodeManagers[index].init(getConfig()); new Thread() { public void run() { - nodeManager.start(); + nodeManagers[index].start(); }; }.start(); int waitCount = 0; - while (nodeManager.getServiceState() == STATE.INITED + while (nodeManagers[index].getServiceState() == STATE.INITED && waitCount++ < 60) { - LOG.info("Waiting for NM to start..."); + LOG.info("Waiting for NM " + index + " to start..."); Thread.sleep(1000); } - if (nodeManager.getServiceState() != STATE.STARTED) { + if (nodeManagers[index].getServiceState() != STATE.STARTED) { // RM could have failed. - throw new IOException("NodeManager failed to start"); + throw new IOException("NodeManager " + index + " failed to start"); } super.start(); } catch (Throwable t) { @@ -254,10 +217,71 @@ public void run() { @Override public synchronized void stop() { - if (nodeManager != null) { - nodeManager.stop(); + if (nodeManagers[index] != null) { + nodeManagers[index].stop(); } super.stop(); } } + + private class CustomNodeManager extends NodeManager { + @Override + protected void doSecureLogin() throws IOException { + // Don't try to login using keytab in the testcase. + }; + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + ContainerTokenSecretManager containerTokenSecretManager) { + return new NodeStatusUpdaterImpl(context, dispatcher, + healthChecker, metrics, containerTokenSecretManager) { + @Override + protected ResourceTracker getRMClient() { + final ResourceTrackerService rt = resourceManager + .getResourceTrackerService(); + final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + // For in-process communication without RPC + return new ResourceTracker() { + + @Override + public NodeHeartbeatResponse nodeHeartbeat( + NodeHeartbeatRequest request) throws YarnRemoteException { + NodeHeartbeatResponse response = recordFactory.newRecordInstance( + NodeHeartbeatResponse.class); + try { + response.setHeartbeatResponse(rt.nodeHeartbeat(request) + .getHeartbeatResponse()); + } catch (IOException ioe) { + LOG.info("Exception in heartbeat from node " + + request.getNodeStatus().getNodeId(), ioe); + throw RPCUtil.getRemoteException(ioe); + } + return response; + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) + throws YarnRemoteException { + RegisterNodeManagerResponse response = recordFactory. + newRecordInstance(RegisterNodeManagerResponse.class); + try { + response.setRegistrationResponse(rt + .registerNodeManager(request) + .getRegistrationResponse()); + } catch (IOException ioe) { + LOG.info("Exception in node registration from " + + request.getNodeId().toString(), ioe); + throw RPCUtil.getRemoteException(ioe); + } + return response; + } + }; + }; + }; + }; + } }