From 1a06175440eec7994d6b63b0e5ac8b6532870fb3 Mon Sep 17 00:00:00 2001
From: Vinod Kumar Vavilapalli
Date: Fri, 21 Jun 2013 00:08:13 +0000
Subject: [PATCH] YARN-851. Share NMTokens using NMTokenCache (api-based)
between AMRMClient and NMClient instead of memory based approach which is
used currently. Contributed by Omkar Vinit Joshi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1495247 13f79535-47bb-0310-9956-ffa450edef68
---
.../hadoop/mapreduce/v2/app/AppContext.java | 3 -
.../hadoop/mapreduce/v2/app/MRAppMaster.java | 7 --
.../app/launcher/ContainerLauncherImpl.java | 3 +-
.../v2/app/rm/RMContainerAllocator.java | 3 +-
.../mapreduce/v2/app/MockAppContext.java | 9 +-
.../v2/app/TestRuntimeEstimators.java | 6 -
.../app/launcher/TestContainerLauncher.java | 2 +-
.../hadoop/mapreduce/v2/hs/JobHistory.java | 7 --
hadoop-yarn-project/CHANGES.txt | 4 +
.../RegisterApplicationMasterResponse.java | 6 +-
.../hadoop/yarn/api/records/Container.java | 2 -
.../distributedshell/ApplicationMaster.java | 3 +-
.../TestDistributedShell.java | 4 +-
.../hadoop/yarn/client/api/AMRMClient.java | 12 --
.../hadoop/yarn/client/api/NMClient.java | 23 +---
.../hadoop/yarn/client/api/NMTokenCache.java | 103 ++++++++++++++++++
.../client/api/async/AMRMClientAsync.java | 13 ---
.../yarn/client/api/async/NMClientAsync.java | 14 +--
.../api/async/impl/AMRMClientAsyncImpl.java | 15 ---
.../api/async/impl/NMClientAsyncImpl.java | 10 +-
.../yarn/client/api/impl/AMRMClientImpl.java | 14 +--
.../ContainerManagementProtocolProxy.java | 19 ++--
.../yarn/client/api/impl/NMClientImpl.java | 14 +--
.../yarn/client/api/impl/TestAMRMClient.java | 27 ++---
.../yarn/client/api/impl/TestNMClient.java | 10 +-
25 files changed, 163 insertions(+), 170 deletions(-)
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
index cbb8e72497..946d9c62c4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
@@ -26,7 +26,6 @@
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;
@@ -62,6 +61,4 @@ public interface AppContext {
Set getBlacklistedNodes();
ClientToAMTokenSecretManager getClientToAMTokenSecretManager();
-
- Map getNMTokens();
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 71d5b52a06..8abd58d5ee 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -886,8 +886,6 @@ private class RunningAppContext implements AppContext {
private final Configuration conf;
private final ClusterInfo clusterInfo = new ClusterInfo();
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
- private final ConcurrentHashMap nmTokens =
- new ConcurrentHashMap();
public RunningAppContext(Configuration config) {
this.conf = config;
@@ -954,11 +952,6 @@ public Set getBlacklistedNodes() {
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
return clientToAMTokenSecretManager;
}
-
- @Override
- public Map getNMTokens() {
- return this.nmTokens;
- }
}
@SuppressWarnings("unchecked")
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
index b2732c638f..28508a92ed 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
@@ -235,8 +235,7 @@ protected void serviceInit(Configuration conf) throws Exception {
MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
super.serviceInit(conf);
- cmProxy =
- new ContainerManagementProtocolProxy(conf, context.getNMTokens());
+ cmProxy = new ContainerManagementProtocolProxy(conf);
}
protected void serviceStart() throws Exception {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 1263582b6f..dc134ebbe2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.RackResolver;
@@ -588,7 +589,7 @@ private List getResources() throws Exception {
// Setting NMTokens
if (response.getNMTokens() != null) {
for (NMToken nmToken : response.getNMTokens()) {
- getContext().getNMTokens().put(nmToken.getNodeId().toString(),
+ NMTokenCache.setNMToken(nmToken.getNodeId().toString(),
nmToken.getToken());
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
index 521e28205d..4b07236705 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
@@ -26,10 +26,9 @@
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
+import org.apache.hadoop.yarn.util.Clock;
import com.google.common.collect.Maps;
@@ -131,10 +130,4 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
// Not implemented
return null;
}
-
- @Override
- public Map getNMTokens() {
- // Not Implemented
- return null;
- }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
index 1742d90e00..762dd572f3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
@@ -862,11 +862,5 @@ public Set getBlacklistedNodes() {
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
return null;
}
-
- @Override
- public Map getNMTokens() {
- // Not Implemented
- return null;
- }
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
index 0033490f13..563c31b36e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
@@ -376,7 +376,7 @@ public ContainerManagementProtocolProxyData getCMProxy(
containerId.getApplicationAttemptId(),
NodeId.newInstance(addr.getHostName(), addr.getPort()), "user");
ContainerManagementProtocolProxy cmProxy =
- new ContainerManagementProtocolProxy(conf, context.getNMTokens());
+ new ContainerManagementProtocolProxy(conf);
ContainerManagementProtocolProxyData proxy =
cmProxy.new ContainerManagementProtocolProxyData(
YarnRPC.create(conf), containerManagerBindAddr, containerId,
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
index 8605bb4785..2c1f3a26ff 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
@@ -44,7 +44,6 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -316,10 +315,4 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
// Not implemented.
return null;
}
-
- @Override
- public Map getNMTokens() {
- // Not Implemented.
- return null;
- }
}
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0035785124..e90155f675 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -204,6 +204,10 @@ Release 2.1.0-beta - UNRELEASED
ApplicationSubmissionContext to simplify the api. (Karthik Kambatla via
acmurthy)
+ YARN-851. Share NMTokens using NMTokenCache (api-based) between AMRMClient
+ and NMClient instead of memory based approach which is used currently. (Omkar
+ Vinit Joshi via vinodkv)
+
NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
index 0517486bd8..9c817b318c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
@@ -36,8 +36,9 @@
*
* The response contains critical details such as:
*
- * - Minimum capability for allocated resources in the cluster.
* - Maximum capability for allocated resources in the cluster.
+ * ApplicationACL
s for the application.
+ * - ClientToAMToken master key.
*
*
*
@@ -50,11 +51,12 @@ public abstract class RegisterApplicationMasterResponse {
@Unstable
public static RegisterApplicationMasterResponse newInstance(
Resource minCapability, Resource maxCapability,
- Map acls) {
+ Map acls, ByteBuffer key) {
RegisterApplicationMasterResponse response =
Records.newRecord(RegisterApplicationMasterResponse.class);
response.setMaximumResourceCapability(maxCapability);
response.setApplicationACLs(acls);
+ response.setClientToAMTokenMasterKey(key);
return response;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
index cb8d04bbda..5cff2ecbed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
@@ -44,12 +44,10 @@
* HTTP uri of the node.
* {@link Resource} allocated to the container.
* {@link Priority} at which the container was allocated.
- * {@link ContainerState} of the container.
*
* Container {@link Token} of the container, used to securely verify
* authenticity of the allocation.
*
- * {@link ContainerStatus} of the container.
*
*
*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 638ce13f43..012af3ff9f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -448,8 +448,7 @@ public boolean run() throws YarnException, IOException {
resourceManager.start();
containerListener = new NMCallbackHandler();
- nmClientAsync =
- new NMClientAsyncImpl(containerListener, resourceManager.getNMTokens());
+ nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index df333d230a..8b05aa1890 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -100,7 +100,7 @@ public static void tearDown() throws IOException {
}
}
- @Test(timeout=30000)
+ @Test(timeout=90000)
public void testDSShell() throws Exception {
String[] args = {
@@ -128,7 +128,7 @@ public void testDSShell() throws Exception {
}
- @Test(timeout=30000)
+ @Test(timeout=90000)
public void testDSShellWithInvalidArgs() throws Exception {
Client client = new Client(new Configuration(yarnCluster.getConfig()));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index f1890a8472..bd0f16b63e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -35,7 +34,6 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -249,14 +247,4 @@ public abstract List extends Collection> getMatchingRequests(
Priority priority,
String resourceName,
Resource capability);
-
- /**
- * It returns the NMToken received on allocate call. It will not communicate
- * with RM to get NMTokens. On allocate call whenever we receive new token
- * along with container AMRMClient will cache this NMToken per node manager.
- * This map returned should be shared with any application which is
- * communicating with NodeManager (ex. NMClient) using NMTokens. If a new
- * NMToken is received for the same node manager then it will be replaced.
- */
- public abstract ConcurrentMap getNMTokens();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
index 00e513dc2d..57e7db5cd3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
@@ -22,21 +22,17 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -46,30 +42,19 @@ public abstract class NMClient extends AbstractService {
/**
* Create a new instance of NMClient.
- * @param nmTokens need to pass map of NMTokens which are received on
- * {@link AMRMClient#allocate(float)} call as a part of
- * {@link AllocateResponse}.
- * key :- NodeAddr (host:port)
- * Value :- Token {@link NMToken#getToken()}
*/
@Public
- public static NMClient createNMClient(ConcurrentMap nmTokens) {
- NMClient client = new NMClientImpl(nmTokens);
+ public static NMClient createNMClient() {
+ NMClient client = new NMClientImpl();
return client;
}
/**
* Create a new instance of NMClient.
- * @param nmTokens need to pass map of NMTokens which are received on
- * {@link AMRMClient#allocate(float)} call as a part of
- * {@link AllocateResponse}.
- * key :- NodeAddr (host:port)
- * Value :- Token {@link NMToken#getToken()}
*/
@Public
- public static NMClient createNMClient(String name,
- ConcurrentMap nmTokens) {
- NMClient client = new NMClientImpl(name, nmTokens);
+ public static NMClient createNMClient(String name) {
+ NMClient client = new NMClientImpl(name);
return client;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java
new file mode 100644
index 0000000000..c14a12c091
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java
@@ -0,0 +1,103 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.records.Token;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * It manages NMTokens required for communicating with Node manager. Its a
+ * static token cache.
+ */
+@Public
+@Evolving
+public class NMTokenCache {
+ private static ConcurrentHashMap nmTokens;
+
+
+ static {
+ nmTokens = new ConcurrentHashMap();
+ }
+
+ /**
+ * Returns NMToken, null if absent
+ * @param nodeAddr
+ * @return {@link Token} NMToken required for communicating with node
+ * manager
+ */
+ @Public
+ @Evolving
+ public static Token getNMToken(String nodeAddr) {
+ return nmTokens.get(nodeAddr);
+ }
+
+ /**
+ * Sets the NMToken for node address
+ * @param nodeAddr node address (host:port)
+ * @param token NMToken
+ */
+ @Public
+ @Evolving
+ public static void setNMToken(String nodeAddr, Token token) {
+ nmTokens.put(nodeAddr, token);
+ }
+
+ /**
+ * Returns true if NMToken is present in cache.
+ */
+ @Private
+ @VisibleForTesting
+ public static boolean containsNMToken(String nodeAddr) {
+ return nmTokens.containsKey(nodeAddr);
+ }
+
+ /**
+ * Returns the number of NMTokens present in cache.
+ */
+ @Private
+ @VisibleForTesting
+ public static int numberOfNMTokensInCache() {
+ return nmTokens.size();
+ }
+
+ /**
+ * Removes NMToken for specified node manager
+ * @param nodeAddr node address (host:port)
+ */
+ @Private
+ @VisibleForTesting
+ public static void removeNMToken(String nodeAddr) {
+ nmTokens.remove(nodeAddr);
+ }
+
+ /**
+ * It will remove all the nm tokens from its cache
+ */
+ @Private
+ @VisibleForTesting
+ public static void clearCache() {
+ nmTokens.clear();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index 8d551dcd0a..ae781b6003 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -37,7 +36,6 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
@@ -198,17 +196,6 @@ public abstract void unregisterApplicationMaster(
*/
public abstract int getClusterNodeCount();
- /**
- * It returns the NMToken received on allocate call. It will not communicate
- * with RM to get NMTokens. On allocate call whenever we receive new token
- * along with new container AMRMClientAsync will cache this NMToken per node
- * manager. This map returned should be shared with any application which is
- * communicating with NodeManager (ex. NMClient / NMClientAsync) using
- * NMTokens. If a new NMToken is received for the same node manager
- * then it will be replaced.
- */
- public abstract ConcurrentMap getNMTokens();
-
public interface CallbackHandler {
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
index 507f8d9906..5cb504d3dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
@@ -112,18 +112,16 @@ public abstract class NMClientAsync extends AbstractService {
protected CallbackHandler callbackHandler;
public static NMClientAsync createNMClientAsync(
- CallbackHandler callbackHandler, ConcurrentMap nmTokens) {
- return new NMClientAsyncImpl(callbackHandler, nmTokens);
+ CallbackHandler callbackHandler) {
+ return new NMClientAsyncImpl(callbackHandler);
}
- protected NMClientAsync(CallbackHandler callbackHandler,
- ConcurrentMap nmTokens) {
- this (NMClientAsync.class.getName(), callbackHandler, nmTokens);
+ protected NMClientAsync(CallbackHandler callbackHandler) {
+ this (NMClientAsync.class.getName(), callbackHandler);
}
- protected NMClientAsync(String name, CallbackHandler callbackHandler,
- ConcurrentMap nmTokens) {
- this (name, new NMClientImpl(nmTokens), callbackHandler);
+ protected NMClientAsync(String name, CallbackHandler callbackHandler) {
+ this (name, new NMClientImpl(), callbackHandler);
}
@Private
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index e667e37c24..cc3969dbe3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -22,7 +22,6 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
@@ -40,7 +39,6 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -215,19 +213,6 @@ public Resource getAvailableResources() {
public int getClusterNodeCount() {
return client.getClusterNodeCount();
}
-
- /**
- * It returns the NMToken received on allocate call. It will not communicate
- * with RM to get NMTokens. On allocate call whenever we receive new token
- * along with new container AMRMClientAsync will cache this NMToken per node
- * manager. This map returned should be shared with any application which is
- * communicating with NodeManager (ex. NMClient / NMClientAsync) using
- * NMTokens. If a new NMToken is received for the same node manager
- * then it will be replaced.
- */
- public ConcurrentMap getNMTokens() {
- return client.getNMTokens();
- }
private class HeartbeatThread extends Thread {
public HeartbeatThread() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
index 7f7df1a856..700a509b65 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
@@ -82,14 +82,12 @@ public class NMClientAsyncImpl extends NMClientAsync {
protected ConcurrentMap containers =
new ConcurrentHashMap();
- public NMClientAsyncImpl(CallbackHandler callbackHandler,
- ConcurrentMap nmTokens) {
- this(NMClientAsync.class.getName(), callbackHandler, nmTokens);
+ public NMClientAsyncImpl(CallbackHandler callbackHandler) {
+ this(NMClientAsync.class.getName(), callbackHandler);
}
- public NMClientAsyncImpl(String name, CallbackHandler callbackHandler,
- ConcurrentMap nmTokens) {
- this(name, new NMClientImpl(nmTokens), callbackHandler);
+ public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
+ this(name, new NMClientImpl(), callbackHandler);
}
@Private
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 74c86b9826..68cc2870d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -34,7 +34,6 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -56,8 +55,8 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -82,7 +81,6 @@ public class AMRMClientImpl extends AMRMClient {
RecordFactoryProvider.getRecordFactory(null);
private int lastResponseId = 0;
- private ConcurrentHashMap nmTokens;
protected ApplicationMasterProtocol rmClient;
protected final ApplicationAttemptId appAttemptId;
@@ -158,7 +156,6 @@ static boolean canFit(Resource arg0, Resource arg1) {
public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
super(AMRMClientImpl.class.getName());
this.appAttemptId = appAttemptId;
- this.nmTokens = new ConcurrentHashMap();
}
@Override
@@ -285,12 +282,12 @@ public AllocateResponse allocate(float progressIndicator)
protected void populateNMTokens(AllocateResponse allocateResponse) {
for (NMToken token : allocateResponse.getNMTokens()) {
String nodeId = token.getNodeId().toString();
- if (nmTokens.containsKey(nodeId)) {
+ if (NMTokenCache.containsNMToken(nodeId)) {
LOG.debug("Replacing token for : " + nodeId);
} else {
LOG.debug("Received new token for : " + nodeId);
}
- nmTokens.put(nodeId, token.getToken());
+ NMTokenCache.setNMToken(nodeId, token.getToken());
}
}
@@ -577,9 +574,4 @@ private void decResourceRequest(Priority priority,
+ " #asks=" + ask.size());
}
}
-
- @Override
- public ConcurrentHashMap getNMTokens() {
- return nmTokens;
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
index a22e200fcb..4ca44e12a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
@@ -23,7 +23,6 @@
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,6 +35,7 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -54,13 +54,10 @@ public class ContainerManagementProtocolProxy {
private final int maxConnectedNMs;
private final LinkedHashMap cmProxy;
- private Map nmTokens;
private final Configuration conf;
private final YarnRPC rpc;
- public ContainerManagementProtocolProxy(Configuration conf,
- Map nmTokens) {
- this.nmTokens = nmTokens;
+ public ContainerManagementProtocolProxy(Configuration conf) {
this.conf = conf;
maxConnectedNMs =
@@ -86,10 +83,10 @@ public synchronized ContainerManagementProtocolProxyData getProxy(
// This get call will update the map which is working as LRU cache.
ContainerManagementProtocolProxyData proxy =
cmProxy.get(containerManagerBindAddr);
-
+
while (proxy != null
&& !proxy.token.getIdentifier().equals(
- nmTokens.get(containerManagerBindAddr).getIdentifier())) {
+ NMTokenCache.getNMToken(containerManagerBindAddr).getIdentifier())) {
LOG.info("Refreshing proxy as NMToken got updated for node : "
+ containerManagerBindAddr);
// Token is updated. check if anyone has already tried closing it.
@@ -112,7 +109,7 @@ public synchronized ContainerManagementProtocolProxyData getProxy(
if (proxy == null) {
proxy =
new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr,
- containerId, nmTokens.get(containerManagerBindAddr));
+ containerId, NMTokenCache.getNMToken(containerManagerBindAddr));
if (cmProxy.size() > maxConnectedNMs) {
// Number of existing proxy exceed the limit.
String cmAddr = cmProxy.keySet().iterator().next();
@@ -172,10 +169,6 @@ public synchronized void stopAllProxies() {
cmProxy.clear();
}
- public synchronized void setNMTokens(Map nmTokens) {
- this.nmTokens = nmTokens;
- }
-
public class ContainerManagementProtocolProxyData {
private final String containerManagerBindAddr;
private final ContainerManagementProtocol proxy;
@@ -201,10 +194,12 @@ public ContainerManagementProtocolProxyData(YarnRPC rpc,
protected ContainerManagementProtocol newProxy(final YarnRPC rpc,
String containerManagerBindAddr, ContainerId containerId, Token token)
throws InvalidToken {
+
if (token == null) {
throw new InvalidToken("No NMToken sent for "
+ containerManagerBindAddr);
}
+
final InetSocketAddress cmAddr =
NetUtils.createSocketAddr(containerManagerBindAddr);
LOG.info("Opening proxy : " + containerManagerBindAddr);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
index 02cfbfb953..54a73faf74 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
@@ -81,18 +81,15 @@ public class NMClientImpl extends NMClient {
new ConcurrentHashMap();
//enabled by default
- private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
- private ContainerManagementProtocolProxy cmProxy;
- private ConcurrentMap nmTokens;
+ private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+ private ContainerManagementProtocolProxy cmProxy;
- public NMClientImpl(ConcurrentMap nmTokens) {
+ public NMClientImpl() {
super(NMClientImpl.class.getName());
- this.nmTokens = nmTokens;
}
- public NMClientImpl(String name, ConcurrentMap nmTokens) {
+ public NMClientImpl(String name) {
super(name);
- this.nmTokens = nmTokens;
}
@Override
@@ -126,8 +123,7 @@ protected synchronized void cleanupRunningContainers() {
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
- cmProxy =
- new ContainerManagementProtocolProxy(conf, nmTokens);
+ cmProxy = new ContainerManagementProtocolProxy(conf);
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 5955f26b0b..4c034ae021 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -26,11 +26,9 @@
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
import junit.framework.Assert;
@@ -50,6 +48,7 @@
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -57,6 +56,7 @@
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest;
@@ -488,8 +488,8 @@ private void testAllocation(final AMRMClientImpl amClient)
int iterationsLeft = 2;
Set releases = new TreeSet();
- ConcurrentHashMap nmTokens = amClient.getNMTokens();
- Assert.assertEquals(0, nmTokens.size());
+ NMTokenCache.clearCache();
+ Assert.assertEquals(0, NMTokenCache.numberOfNMTokensInCache());
HashMap receivedNMTokens = new HashMap();
while (allocatedContainerCount < containersRequestedAny
@@ -505,19 +505,13 @@ private void testAllocation(final AMRMClientImpl amClient)
releases.add(rejectContainerId);
amClient.releaseAssignedContainer(rejectContainerId);
}
- Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size());
- Iterator nodeI = nmTokens.keySet().iterator();
- while (nodeI.hasNext()) {
- String nodeId = nodeI.next();
- if (!receivedNMTokens.containsKey(nodeId)) {
- receivedNMTokens.put(nodeId, nmTokens.get(nodeId));
- } else {
- Assert.fail("Received token again for : " + nodeId);
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ if (receivedNMTokens.containsKey(nodeID)) {
+ Assert.fail("Received token again for : " + nodeID);
}
- }
- nodeI = receivedNMTokens.keySet().iterator();
- while (nodeI.hasNext()) {
- nmTokens.remove(nodeI.next());
+ receivedNMTokens.put(nodeID, token.getToken());
}
if(allocatedContainerCount < containersRequestedAny) {
@@ -526,7 +520,6 @@ private void testAllocation(final AMRMClientImpl amClient)
}
}
- Assert.assertEquals(0, amClient.getNMTokens().size());
// Should receive atleast 1 token
Assert.assertTrue(receivedNMTokens.size() > 0
&& receivedNMTokens.size() <= nodeCount);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index 5bcb428c3e..dc6367b10b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -74,7 +75,6 @@ public class TestNMClient {
List nodeReports = null;
ApplicationAttemptId attemptId = null;
int nodeCount = 3;
- ConcurrentHashMap nmTokens;
@Before
public void setup() throws YarnException, IOException {
@@ -136,7 +136,6 @@ public void setup() throws YarnException, IOException {
if (iterationsLeft == 0) {
fail("Application hasn't bee started");
}
- nmTokens = new ConcurrentHashMap();
// start am rm client
rmClient =
@@ -148,7 +147,7 @@ public void setup() throws YarnException, IOException {
assertEquals(STATE.STARTED, rmClient.getServiceState());
// start am nm client
- nmClient = (NMClientImpl) NMClient.createNMClient(nmTokens);
+ nmClient = (NMClientImpl) NMClient.createNMClient();
nmClient.init(conf);
nmClient.start();
assertNotNull(nmClient);
@@ -173,7 +172,7 @@ private void stopNmClient(boolean stopContainers) {
nmClient.stop();
}
- @Test (timeout = 60000)
+ @Test (timeout = 180000)
public void testNMClientNoCleanupOnStop()
throws YarnException, IOException {
@@ -241,7 +240,8 @@ private Set allocateContainers(
}
if (!allocResponse.getNMTokens().isEmpty()) {
for (NMToken token : allocResponse.getNMTokens()) {
- nmTokens.put(token.getNodeId().toString(), token.getToken());
+ NMTokenCache.setNMToken(token.getNodeId().toString(),
+ token.getToken());
}
}
if(allocatedContainerCount < containersRequestedAny) {