YARN-1321. Changed NMTokenCache to support both singleton and an instance usage. Contributed by Alejandro Abdelnur.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1537334 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-10-30 22:37:50 +00:00
parent ff549faea0
commit b8f1d1350b
9 changed files with 230 additions and 20 deletions

View File

@ -123,6 +123,9 @@ Release 2.2.1 - UNRELEASED
YARN-1109. Demote NodeManager "Sending out status for container" logs to YARN-1109. Demote NodeManager "Sending out status for container" logs to
debug (haosdent via Sandy Ryza) debug (haosdent via Sandy Ryza)
YARN-1321. Changed NMTokenCache to support both singleton and an instance
usage. (Alejandro Abdelnur via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -59,9 +59,12 @@ public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient() {
return client; return client;
} }
private NMTokenCache nmTokenCache;
@Private @Private
protected AMRMClient(String name) { protected AMRMClient(String name) {
super(name); super(name);
nmTokenCache = NMTokenCache.getSingleton();
} }
/** /**
@ -297,4 +300,33 @@ public abstract List<? extends Collection<T>> getMatchingRequests(
*/ */
public abstract void updateBlacklist(List<String> blacklistAdditions, public abstract void updateBlacklist(List<String> blacklistAdditions,
List<String> blacklistRemovals); List<String> blacklistRemovals);
/**
* Set the NM token cache for the <code>AMRMClient</code>. This cache must
* be shared with the {@link NMClient} used to manage containers for the
* <code>AMRMClient</code>
* <p/>
* If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
* singleton instance will be used.
*
* @param nmTokenCache the NM token cache to use.
*/
public void setNMTokenCache(NMTokenCache nmTokenCache) {
this.nmTokenCache = nmTokenCache;
}
/**
* Get the NM token cache of the <code>AMRMClient</code>. This cache must be
* shared with the {@link NMClient} used to manage containers for the
* <code>AMRMClient</code>.
* <p/>
* If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
* singleton instance will be used.
*
* @return the NM token cache.
*/
public NMTokenCache getNMTokenCache() {
return nmTokenCache;
}
} }

View File

@ -58,6 +58,8 @@ public static NMClient createNMClient(String name) {
return client; return client;
} }
private NMTokenCache nmTokenCache = NMTokenCache.getSingleton();
@Private @Private
protected NMClient(String name) { protected NMClient(String name) {
super(name); super(name);
@ -118,4 +120,33 @@ public abstract ContainerStatus getContainerStatus(ContainerId containerId,
* @param enabled whether the feature is enabled or not * @param enabled whether the feature is enabled or not
*/ */
public abstract void cleanupRunningContainersOnStop(boolean enabled); public abstract void cleanupRunningContainersOnStop(boolean enabled);
/**
* Set the NM Token cache of the <code>NMClient</code>. This cache must be
* shared with the {@link AMRMClient} that requested the containers managed
* by this <code>NMClient</code>
* <p/>
* If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
* singleton instance will be used.
*
* @param nmTokenCache the NM token cache to use.
*/
public void setNMTokenCache(NMTokenCache nmTokenCache) {
this.nmTokenCache = nmTokenCache;
}
/**
* Get the NM token cache of the <code>NMClient</code>. This cache must be
* shared with the {@link AMRMClient} that requested the containers managed
* by this <code>NMClient</code>
* <p/>
* If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
* singleton instance will be used.
*
* @return the NM token cache
*/
public NMTokenCache getNMTokenCache() {
return nmTokenCache;
}
} }

View File

@ -23,21 +23,139 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
/** /**
* It manages NMTokens required for communicating with Node manager. Its a * NMTokenCache manages NMTokens required for an Application Master
* static token cache. * communicating with individual NodeManagers.
* <p/>
* By default Yarn client libraries {@link AMRMClient} and {@link NMClient} use
* {@link #getSingleton()} instance of the cache.
* <ul>
* <li>Using the singleton instance of the cache is appropriate when running a
* single ApplicationMaster in the same JVM.</li>
* <li>When using the singleton, users don't need to do anything special,
* {@link AMRMClient} and {@link NMClient} are already set up to use the default
* singleton {@link NMTokenCache}</li>
* </ul>
* <p/>
* If running multiple Application Masters in the same JVM, a different cache
* instance should be used for each Application Master.
* <p/>
* <ul>
* <li>
* If using the {@link AMRMClient} and the {@link NMClient}, setting up and using
* an instance cache is as follows:
* <p/>
*
* <pre>
* NMTokenCache nmTokenCache = new NMTokenCache();
* AMRMClient rmClient = AMRMClient.createAMRMClient();
* NMClient nmClient = NMClient.createNMClient();
* nmClient.setNMTokenCache(nmTokenCache);
* ...
* </pre>
* </li>
* <li>
* If using the {@link AMRMClientAsync} and the {@link NMClientAsync}, setting up
* and using an instance cache is as follows:
* <p/>
*
* <pre>
* NMTokenCache nmTokenCache = new NMTokenCache();
* AMRMClient rmClient = AMRMClient.createAMRMClient();
* NMClient nmClient = NMClient.createNMClient();
* nmClient.setNMTokenCache(nmTokenCache);
* AMRMClientAsync rmClientAsync = new AMRMClientAsync(rmClient, 1000, [AMRM_CALLBACK]);
* NMClientAsync nmClientAsync = new NMClientAsync("nmClient", nmClient, [NM_CALLBACK]);
* ...
* </pre>
* </li>
* <li>
* If using {@link ApplicationMasterProtocol} and
* {@link ContainerManagementProtocol} directly, setting up and using an
* instance cache is as follows:
* <p/>
*
* <pre>
* NMTokenCache nmTokenCache = new NMTokenCache();
* ...
* ApplicationMasterProtocol amPro = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
* ...
* AllocateRequest allocateRequest = ...
* ...
* AllocateResponse allocateResponse = rmClient.allocate(allocateRequest);
* for (NMToken token : allocateResponse.getNMTokens()) {
* nmTokenCache.setToken(token.getNodeId().toString(), token.getToken());
* }
* ...
* ContainerManagementProtocolProxy nmPro = ContainerManagementProtocolProxy(conf, nmTokenCache);
* ...
* nmPro.startContainer(container, containerContext);
* ...
* </pre>
* </li>
* </ul>
* It is also possible to mix the usage of a client (<code>AMRMClient</code> or
* <code>NMClient</code>, or the async versions of them) with a protocol proxy (
* <code>ContainerManagementProtocolProxy</code> or
* <code>ApplicationMasterProtocol</code>).
*/ */
@Public @Public
@Evolving @Evolving
public class NMTokenCache { public class NMTokenCache {
private static ConcurrentHashMap<String, Token> nmTokens; private static final NMTokenCache NM_TOKEN_CACHE = new NMTokenCache();
/**
* Returns the singleton NM token cache.
*
* @return the singleton NM token cache.
*/
public static NMTokenCache getSingleton() {
return NM_TOKEN_CACHE;
}
static { /**
* Returns NMToken, null if absent. Only the singleton obtained from
* {@link #getSingleton()} is looked at for the tokens. If you are using your
* own NMTokenCache that is different from the singleton, use
* {@link #getToken(String) }
*
* @param nodeAddr
* @return {@link Token} NMToken required for communicating with node manager
*/
@Public
public static Token getNMToken(String nodeAddr) {
return NM_TOKEN_CACHE.getToken(nodeAddr);
}
/**
* Sets the NMToken for node address only in the singleton obtained from
* {@link #getSingleton()}. If you are using your own NMTokenCache that is
* different from the singleton, use {@link #setToken(String, Token) }
*
* @param nodeAddr
* node address (host:port)
* @param token
* NMToken
*/
@Public
public static void setNMToken(String nodeAddr, Token token) {
NM_TOKEN_CACHE.setToken(nodeAddr, token);
}
private ConcurrentHashMap<String, Token> nmTokens;
/**
* Creates a NM token cache instance.
*/
public NMTokenCache() {
nmTokens = new ConcurrentHashMap<String, Token>(); nmTokens = new ConcurrentHashMap<String, Token>();
} }
@ -45,11 +163,11 @@ public class NMTokenCache {
* Returns NMToken, null if absent * Returns NMToken, null if absent
* @param nodeAddr * @param nodeAddr
* @return {@link Token} NMToken required for communicating with node * @return {@link Token} NMToken required for communicating with node
* manager * manager
*/ */
@Public @Public
@Evolving @Evolving
public static Token getNMToken(String nodeAddr) { public Token getToken(String nodeAddr) {
return nmTokens.get(nodeAddr); return nmTokens.get(nodeAddr);
} }
@ -60,7 +178,7 @@ public static Token getNMToken(String nodeAddr) {
*/ */
@Public @Public
@Evolving @Evolving
public static void setNMToken(String nodeAddr, Token token) { public void setToken(String nodeAddr, Token token) {
nmTokens.put(nodeAddr, token); nmTokens.put(nodeAddr, token);
} }
@ -69,7 +187,7 @@ public static void setNMToken(String nodeAddr, Token token) {
*/ */
@Private @Private
@VisibleForTesting @VisibleForTesting
public static boolean containsNMToken(String nodeAddr) { public boolean containsToken(String nodeAddr) {
return nmTokens.containsKey(nodeAddr); return nmTokens.containsKey(nodeAddr);
} }
@ -78,7 +196,7 @@ public static boolean containsNMToken(String nodeAddr) {
*/ */
@Private @Private
@VisibleForTesting @VisibleForTesting
public static int numberOfNMTokensInCache() { public int numberOfTokensInCache() {
return nmTokens.size(); return nmTokens.size();
} }
@ -88,7 +206,7 @@ public static int numberOfNMTokensInCache() {
*/ */
@Private @Private
@VisibleForTesting @VisibleForTesting
public static void removeNMToken(String nodeAddr) { public void removeToken(String nodeAddr) {
nmTokens.remove(nodeAddr); nmTokens.remove(nodeAddr);
} }
@ -97,7 +215,7 @@ public static void removeNMToken(String nodeAddr) {
*/ */
@Private @Private
@VisibleForTesting @VisibleForTesting
public static void clearCache() { public void clearCache() {
nmTokens.clear(); nmTokens.clear();
} }
} }

View File

@ -58,7 +58,6 @@
import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -288,12 +287,12 @@ public AllocateResponse allocate(float progressIndicator)
protected void populateNMTokens(AllocateResponse allocateResponse) { protected void populateNMTokens(AllocateResponse allocateResponse) {
for (NMToken token : allocateResponse.getNMTokens()) { for (NMToken token : allocateResponse.getNMTokens()) {
String nodeId = token.getNodeId().toString(); String nodeId = token.getNodeId().toString();
if (NMTokenCache.containsNMToken(nodeId)) { if (getNMTokenCache().containsToken(nodeId)) {
LOG.debug("Replacing token for : " + nodeId); LOG.debug("Replacing token for : " + nodeId);
} else { } else {
LOG.debug("Received new token for : " + nodeId); LOG.debug("Received new token for : " + nodeId);
} }
NMTokenCache.setNMToken(nodeId, token.getToken()); getNMTokenCache().setToken(nodeId, token.getToken());
} }
} }

View File

@ -56,9 +56,16 @@ public class ContainerManagementProtocolProxy {
private final LinkedHashMap<String, ContainerManagementProtocolProxyData> cmProxy; private final LinkedHashMap<String, ContainerManagementProtocolProxyData> cmProxy;
private final Configuration conf; private final Configuration conf;
private final YarnRPC rpc; private final YarnRPC rpc;
private NMTokenCache nmTokenCache;
public ContainerManagementProtocolProxy(Configuration conf) { public ContainerManagementProtocolProxy(Configuration conf) {
this(conf, NMTokenCache.getSingleton());
}
public ContainerManagementProtocolProxy(Configuration conf,
NMTokenCache nmTokenCache) {
this.conf = conf; this.conf = conf;
this.nmTokenCache = nmTokenCache;
maxConnectedNMs = maxConnectedNMs =
conf.getInt(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES, conf.getInt(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES,
@ -86,7 +93,7 @@ public synchronized ContainerManagementProtocolProxyData getProxy(
while (proxy != null while (proxy != null
&& !proxy.token.getIdentifier().equals( && !proxy.token.getIdentifier().equals(
NMTokenCache.getNMToken(containerManagerBindAddr).getIdentifier())) { nmTokenCache.getToken(containerManagerBindAddr).getIdentifier())) {
LOG.info("Refreshing proxy as NMToken got updated for node : " LOG.info("Refreshing proxy as NMToken got updated for node : "
+ containerManagerBindAddr); + containerManagerBindAddr);
// Token is updated. check if anyone has already tried closing it. // Token is updated. check if anyone has already tried closing it.
@ -109,7 +116,7 @@ public synchronized ContainerManagementProtocolProxyData getProxy(
if (proxy == null) { if (proxy == null) {
proxy = proxy =
new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr, new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr,
containerId, NMTokenCache.getNMToken(containerManagerBindAddr)); containerId, nmTokenCache.getToken(containerManagerBindAddr));
if (cmProxy.size() > maxConnectedNMs) { if (cmProxy.size() > maxConnectedNMs) {
// Number of existing proxy exceed the limit. // Number of existing proxy exceed the limit.
String cmAddr = cmProxy.keySet().iterator().next(); String cmAddr = cmProxy.keySet().iterator().next();

View File

@ -130,7 +130,10 @@ protected synchronized void cleanupRunningContainers() {
@Override @Override
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf); super.serviceInit(conf);
cmProxy = new ContainerManagementProtocolProxy(conf); if (getNMTokenCache() == null) {
throw new IllegalStateException("NMTokenCache has not been set");
}
cmProxy = new ContainerManagementProtocolProxy(conf, getNMTokenCache());
} }
@Override @Override

View File

@ -626,6 +626,13 @@ public void testAMRMClient() throws YarnException, IOException {
try { try {
// start am rm client // start am rm client
amClient = AMRMClient.<ContainerRequest>createAMRMClient(); amClient = AMRMClient.<ContainerRequest>createAMRMClient();
//setting an instance NMTokenCache
amClient.setNMTokenCache(new NMTokenCache());
//asserting we are not using the singleton instance cache
Assert.assertNotSame(NMTokenCache.getSingleton(),
amClient.getNMTokenCache());
amClient.init(conf); amClient.init(conf);
amClient.start(); amClient.start();
@ -681,8 +688,8 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
int iterationsLeft = 3; int iterationsLeft = 3;
Set<ContainerId> releases = new TreeSet<ContainerId>(); Set<ContainerId> releases = new TreeSet<ContainerId>();
NMTokenCache.clearCache(); amClient.getNMTokenCache().clearCache();
Assert.assertEquals(0, NMTokenCache.numberOfNMTokensInCache()); Assert.assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache());
HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>(); HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
while (allocatedContainerCount < containersRequestedAny while (allocatedContainerCount < containersRequestedAny

View File

@ -78,6 +78,7 @@ public class TestNMClient {
List<NodeReport> nodeReports = null; List<NodeReport> nodeReports = null;
ApplicationAttemptId attemptId = null; ApplicationAttemptId attemptId = null;
int nodeCount = 3; int nodeCount = 3;
NMTokenCache nmTokenCache = null;
@Before @Before
public void setup() throws YarnException, IOException { public void setup() throws YarnException, IOException {
@ -155,10 +156,16 @@ public void setup() throws YarnException, IOException {
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
//creating an instance NMTokenCase
nmTokenCache = new NMTokenCache();
// start am rm client // start am rm client
rmClient = rmClient =
(AMRMClientImpl<ContainerRequest>) AMRMClient (AMRMClientImpl<ContainerRequest>) AMRMClient
.<ContainerRequest> createAMRMClient(); .<ContainerRequest> createAMRMClient();
//setting an instance NMTokenCase
rmClient.setNMTokenCache(nmTokenCache);
rmClient.init(conf); rmClient.init(conf);
rmClient.start(); rmClient.start();
assertNotNull(rmClient); assertNotNull(rmClient);
@ -166,6 +173,9 @@ public void setup() throws YarnException, IOException {
// start am nm client // start am nm client
nmClient = (NMClientImpl) NMClient.createNMClient(); nmClient = (NMClientImpl) NMClient.createNMClient();
//propagating the AMRMClient NMTokenCache instance
nmClient.setNMTokenCache(rmClient.getNMTokenCache());
nmClient.init(conf); nmClient.init(conf);
nmClient.start(); nmClient.start();
assertNotNull(nmClient); assertNotNull(nmClient);
@ -258,7 +268,7 @@ private Set<Container> allocateContainers(
} }
if (!allocResponse.getNMTokens().isEmpty()) { if (!allocResponse.getNMTokens().isEmpty()) {
for (NMToken token : allocResponse.getNMTokens()) { for (NMToken token : allocResponse.getNMTokens()) {
NMTokenCache.setNMToken(token.getNodeId().toString(), rmClient.getNMTokenCache().setToken(token.getNodeId().toString(),
token.getToken()); token.getToken());
} }
} }