YARN-851. Share NMTokens using NMTokenCache (api-based) between AMRMClient and NMClient instead of memory based approach which is used currently. Contributed by Omkar Vinit Joshi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1495247 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
575495b5b1
commit
1a06175440
@ -26,7 +26,6 @@
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
@ -62,6 +61,4 @@ public interface AppContext {
|
||||
Set<String> getBlacklistedNodes();
|
||||
|
||||
ClientToAMTokenSecretManager getClientToAMTokenSecretManager();
|
||||
|
||||
Map<String, Token> getNMTokens();
|
||||
}
|
||||
|
@ -886,8 +886,6 @@ private class RunningAppContext implements AppContext {
|
||||
private final Configuration conf;
|
||||
private final ClusterInfo clusterInfo = new ClusterInfo();
|
||||
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
|
||||
private final ConcurrentHashMap<String, org.apache.hadoop.yarn.api.records.Token> nmTokens =
|
||||
new ConcurrentHashMap<String, org.apache.hadoop.yarn.api.records.Token>();
|
||||
|
||||
public RunningAppContext(Configuration config) {
|
||||
this.conf = config;
|
||||
@ -954,11 +952,6 @@ public Set<String> getBlacklistedNodes() {
|
||||
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
|
||||
return clientToAMTokenSecretManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, org.apache.hadoop.yarn.api.records.Token> getNMTokens() {
|
||||
return this.nmTokens;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -235,8 +235,7 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
|
||||
LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
|
||||
super.serviceInit(conf);
|
||||
cmProxy =
|
||||
new ContainerManagementProtocolProxy(conf, context.getNMTokens());
|
||||
cmProxy = new ContainerManagementProtocolProxy(conf);
|
||||
}
|
||||
|
||||
protected void serviceStart() throws Exception {
|
||||
|
@ -68,6 +68,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.util.RackResolver;
|
||||
@ -588,7 +589,7 @@ private List<Container> getResources() throws Exception {
|
||||
// Setting NMTokens
|
||||
if (response.getNMTokens() != null) {
|
||||
for (NMToken nmToken : response.getNMTokens()) {
|
||||
getContext().getNMTokens().put(nmToken.getNodeId().toString(),
|
||||
NMTokenCache.setNMToken(nmToken.getNodeId().toString(),
|
||||
nmToken.getToken());
|
||||
}
|
||||
}
|
||||
|
@ -26,10 +26,9 @@
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
@ -131,10 +130,4 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
|
||||
// Not implemented
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Token> getNMTokens() {
|
||||
// Not Implemented
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -862,11 +862,5 @@ public Set<String> getBlacklistedNodes() {
|
||||
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Token> getNMTokens() {
|
||||
// Not Implemented
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -376,7 +376,7 @@ public ContainerManagementProtocolProxyData getCMProxy(
|
||||
containerId.getApplicationAttemptId(),
|
||||
NodeId.newInstance(addr.getHostName(), addr.getPort()), "user");
|
||||
ContainerManagementProtocolProxy cmProxy =
|
||||
new ContainerManagementProtocolProxy(conf, context.getNMTokens());
|
||||
new ContainerManagementProtocolProxy(conf);
|
||||
ContainerManagementProtocolProxyData proxy =
|
||||
cmProxy.new ContainerManagementProtocolProxyData(
|
||||
YarnRPC.create(conf), containerManagerBindAddr, containerId,
|
||||
|
@ -44,7 +44,6 @@
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
@ -316,10 +315,4 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
|
||||
// Not implemented.
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Token> getNMTokens() {
|
||||
// Not Implemented.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -204,6 +204,10 @@ Release 2.1.0-beta - UNRELEASED
|
||||
ApplicationSubmissionContext to simplify the api. (Karthik Kambatla via
|
||||
acmurthy)
|
||||
|
||||
YARN-851. Share NMTokens using NMTokenCache (api-based) between AMRMClient
|
||||
and NMClient instead of memory based approach which is used currently. (Omkar
|
||||
Vinit Joshi via vinodkv)
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
YARN-482. FS: Extend SchedulingMode to intermediate queues.
|
||||
|
@ -36,8 +36,9 @@
|
||||
*
|
||||
* <p>The response contains critical details such as:
|
||||
* <ul>
|
||||
* <li>Minimum capability for allocated resources in the cluster.</li>
|
||||
* <li>Maximum capability for allocated resources in the cluster.</li>
|
||||
* <li><code>ApplicationACL</code>s for the application.</li>
|
||||
* <li>ClientToAMToken master key.</li>
|
||||
* </ul>
|
||||
* </p>
|
||||
*
|
||||
@ -50,11 +51,12 @@ public abstract class RegisterApplicationMasterResponse {
|
||||
@Unstable
|
||||
public static RegisterApplicationMasterResponse newInstance(
|
||||
Resource minCapability, Resource maxCapability,
|
||||
Map<ApplicationAccessType, String> acls) {
|
||||
Map<ApplicationAccessType, String> acls, ByteBuffer key) {
|
||||
RegisterApplicationMasterResponse response =
|
||||
Records.newRecord(RegisterApplicationMasterResponse.class);
|
||||
response.setMaximumResourceCapability(maxCapability);
|
||||
response.setApplicationACLs(acls);
|
||||
response.setClientToAMTokenMasterKey(key);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -44,12 +44,10 @@
|
||||
* <li>HTTP uri of the node.</li>
|
||||
* <li>{@link Resource} allocated to the container.</li>
|
||||
* <li>{@link Priority} at which the container was allocated.</li>
|
||||
* <li>{@link ContainerState} of the container.</li>
|
||||
* <li>
|
||||
* Container {@link Token} of the container, used to securely verify
|
||||
* authenticity of the allocation.
|
||||
* </li>
|
||||
* <li>{@link ContainerStatus} of the container.</li>
|
||||
* </ul>
|
||||
* </p>
|
||||
*
|
||||
|
@ -448,8 +448,7 @@ public boolean run() throws YarnException, IOException {
|
||||
resourceManager.start();
|
||||
|
||||
containerListener = new NMCallbackHandler();
|
||||
nmClientAsync =
|
||||
new NMClientAsyncImpl(containerListener, resourceManager.getNMTokens());
|
||||
nmClientAsync = new NMClientAsyncImpl(containerListener);
|
||||
nmClientAsync.init(conf);
|
||||
nmClientAsync.start();
|
||||
|
||||
|
@ -100,7 +100,7 @@ public static void tearDown() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
@Test(timeout=90000)
|
||||
public void testDSShell() throws Exception {
|
||||
|
||||
String[] args = {
|
||||
@ -128,7 +128,7 @@ public void testDSShell() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
@Test(timeout=90000)
|
||||
public void testDSShellWithInvalidArgs() throws Exception {
|
||||
Client client = new Client(new Configuration(yarnCluster.getConfig()));
|
||||
|
||||
|
@ -21,7 +21,6 @@
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
@ -35,7 +34,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
@ -249,14 +247,4 @@ public abstract List<? extends Collection<T>> getMatchingRequests(
|
||||
Priority priority,
|
||||
String resourceName,
|
||||
Resource capability);
|
||||
|
||||
/**
|
||||
* It returns the NMToken received on allocate call. It will not communicate
|
||||
* with RM to get NMTokens. On allocate call whenever we receive new token
|
||||
* along with container AMRMClient will cache this NMToken per node manager.
|
||||
* This map returned should be shared with any application which is
|
||||
* communicating with NodeManager (ex. NMClient) using NMTokens. If a new
|
||||
* NMToken is received for the same node manager then it will be replaced.
|
||||
*/
|
||||
public abstract ConcurrentMap<String, Token> getNMTokens();
|
||||
}
|
||||
|
@ -22,21 +22,17 @@
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
@ -46,30 +42,19 @@ public abstract class NMClient extends AbstractService {
|
||||
|
||||
/**
|
||||
* Create a new instance of NMClient.
|
||||
* @param nmTokens need to pass map of NMTokens which are received on
|
||||
* {@link AMRMClient#allocate(float)} call as a part of
|
||||
* {@link AllocateResponse}.
|
||||
* key :- NodeAddr (host:port)
|
||||
* Value :- Token {@link NMToken#getToken()}
|
||||
*/
|
||||
@Public
|
||||
public static NMClient createNMClient(ConcurrentMap<String, Token> nmTokens) {
|
||||
NMClient client = new NMClientImpl(nmTokens);
|
||||
public static NMClient createNMClient() {
|
||||
NMClient client = new NMClientImpl();
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance of NMClient.
|
||||
* @param nmTokens need to pass map of NMTokens which are received on
|
||||
* {@link AMRMClient#allocate(float)} call as a part of
|
||||
* {@link AllocateResponse}.
|
||||
* key :- NodeAddr (host:port)
|
||||
* Value :- Token {@link NMToken#getToken()}
|
||||
*/
|
||||
@Public
|
||||
public static NMClient createNMClient(String name,
|
||||
ConcurrentMap<String, Token> nmTokens) {
|
||||
NMClient client = new NMClientImpl(name, nmTokens);
|
||||
public static NMClient createNMClient(String name) {
|
||||
NMClient client = new NMClientImpl(name);
|
||||
return client;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,103 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client.api;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* It manages NMTokens required for communicating with Node manager. Its a
|
||||
* static token cache.
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public class NMTokenCache {
|
||||
private static ConcurrentHashMap<String, Token> nmTokens;
|
||||
|
||||
|
||||
static {
|
||||
nmTokens = new ConcurrentHashMap<String, Token>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns NMToken, null if absent
|
||||
* @param nodeAddr
|
||||
* @return {@link Token} NMToken required for communicating with node
|
||||
* manager
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public static Token getNMToken(String nodeAddr) {
|
||||
return nmTokens.get(nodeAddr);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the NMToken for node address
|
||||
* @param nodeAddr node address (host:port)
|
||||
* @param token NMToken
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public static void setNMToken(String nodeAddr, Token token) {
|
||||
nmTokens.put(nodeAddr, token);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if NMToken is present in cache.
|
||||
*/
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public static boolean containsNMToken(String nodeAddr) {
|
||||
return nmTokens.containsKey(nodeAddr);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of NMTokens present in cache.
|
||||
*/
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public static int numberOfNMTokensInCache() {
|
||||
return nmTokens.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes NMToken for specified node manager
|
||||
* @param nodeAddr node address (host:port)
|
||||
*/
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public static void removeNMToken(String nodeAddr) {
|
||||
nmTokens.remove(nodeAddr);
|
||||
}
|
||||
|
||||
/**
|
||||
* It will remove all the nm tokens from its cache
|
||||
*/
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public static void clearCache() {
|
||||
nmTokens.clear();
|
||||
}
|
||||
}
|
@ -21,7 +21,6 @@
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
@ -37,7 +36,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
|
||||
@ -198,17 +196,6 @@ public abstract void unregisterApplicationMaster(
|
||||
*/
|
||||
public abstract int getClusterNodeCount();
|
||||
|
||||
/**
|
||||
* It returns the NMToken received on allocate call. It will not communicate
|
||||
* with RM to get NMTokens. On allocate call whenever we receive new token
|
||||
* along with new container AMRMClientAsync will cache this NMToken per node
|
||||
* manager. This map returned should be shared with any application which is
|
||||
* communicating with NodeManager (ex. NMClient / NMClientAsync) using
|
||||
* NMTokens. If a new NMToken is received for the same node manager
|
||||
* then it will be replaced.
|
||||
*/
|
||||
public abstract ConcurrentMap<String, Token> getNMTokens();
|
||||
|
||||
public interface CallbackHandler {
|
||||
|
||||
/**
|
||||
|
@ -112,18 +112,16 @@ public abstract class NMClientAsync extends AbstractService {
|
||||
protected CallbackHandler callbackHandler;
|
||||
|
||||
public static NMClientAsync createNMClientAsync(
|
||||
CallbackHandler callbackHandler, ConcurrentMap<String, Token> nmTokens) {
|
||||
return new NMClientAsyncImpl(callbackHandler, nmTokens);
|
||||
CallbackHandler callbackHandler) {
|
||||
return new NMClientAsyncImpl(callbackHandler);
|
||||
}
|
||||
|
||||
protected NMClientAsync(CallbackHandler callbackHandler,
|
||||
ConcurrentMap<String, Token> nmTokens) {
|
||||
this (NMClientAsync.class.getName(), callbackHandler, nmTokens);
|
||||
protected NMClientAsync(CallbackHandler callbackHandler) {
|
||||
this (NMClientAsync.class.getName(), callbackHandler);
|
||||
}
|
||||
|
||||
protected NMClientAsync(String name, CallbackHandler callbackHandler,
|
||||
ConcurrentMap<String, Token> nmTokens) {
|
||||
this (name, new NMClientImpl(nmTokens), callbackHandler);
|
||||
protected NMClientAsync(String name, CallbackHandler callbackHandler) {
|
||||
this (name, new NMClientImpl(), callbackHandler);
|
||||
}
|
||||
|
||||
@Private
|
||||
|
@ -22,7 +22,6 @@
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -40,7 +39,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
||||
@ -215,19 +213,6 @@ public Resource getAvailableResources() {
|
||||
public int getClusterNodeCount() {
|
||||
return client.getClusterNodeCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* It returns the NMToken received on allocate call. It will not communicate
|
||||
* with RM to get NMTokens. On allocate call whenever we receive new token
|
||||
* along with new container AMRMClientAsync will cache this NMToken per node
|
||||
* manager. This map returned should be shared with any application which is
|
||||
* communicating with NodeManager (ex. NMClient / NMClientAsync) using
|
||||
* NMTokens. If a new NMToken is received for the same node manager
|
||||
* then it will be replaced.
|
||||
*/
|
||||
public ConcurrentMap<String, Token> getNMTokens() {
|
||||
return client.getNMTokens();
|
||||
}
|
||||
|
||||
private class HeartbeatThread extends Thread {
|
||||
public HeartbeatThread() {
|
||||
|
@ -82,14 +82,12 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
||||
protected ConcurrentMap<ContainerId, StatefulContainer> containers =
|
||||
new ConcurrentHashMap<ContainerId, StatefulContainer>();
|
||||
|
||||
public NMClientAsyncImpl(CallbackHandler callbackHandler,
|
||||
ConcurrentMap<String, Token> nmTokens) {
|
||||
this(NMClientAsync.class.getName(), callbackHandler, nmTokens);
|
||||
public NMClientAsyncImpl(CallbackHandler callbackHandler) {
|
||||
this(NMClientAsync.class.getName(), callbackHandler);
|
||||
}
|
||||
|
||||
public NMClientAsyncImpl(String name, CallbackHandler callbackHandler,
|
||||
ConcurrentMap<String, Token> nmTokens) {
|
||||
this(name, new NMClientImpl(nmTokens), callbackHandler);
|
||||
public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
|
||||
this(name, new NMClientImpl(), callbackHandler);
|
||||
}
|
||||
|
||||
@Private
|
||||
|
@ -34,7 +34,6 @@
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -56,8 +55,8 @@
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
@ -82,7 +81,6 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
private int lastResponseId = 0;
|
||||
private ConcurrentHashMap<String, Token> nmTokens;
|
||||
|
||||
protected ApplicationMasterProtocol rmClient;
|
||||
protected final ApplicationAttemptId appAttemptId;
|
||||
@ -158,7 +156,6 @@ static boolean canFit(Resource arg0, Resource arg1) {
|
||||
public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
|
||||
super(AMRMClientImpl.class.getName());
|
||||
this.appAttemptId = appAttemptId;
|
||||
this.nmTokens = new ConcurrentHashMap<String, Token>();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -285,12 +282,12 @@ public AllocateResponse allocate(float progressIndicator)
|
||||
protected void populateNMTokens(AllocateResponse allocateResponse) {
|
||||
for (NMToken token : allocateResponse.getNMTokens()) {
|
||||
String nodeId = token.getNodeId().toString();
|
||||
if (nmTokens.containsKey(nodeId)) {
|
||||
if (NMTokenCache.containsNMToken(nodeId)) {
|
||||
LOG.debug("Replacing token for : " + nodeId);
|
||||
} else {
|
||||
LOG.debug("Received new token for : " + nodeId);
|
||||
}
|
||||
nmTokens.put(nodeId, token.getToken());
|
||||
NMTokenCache.setNMToken(nodeId, token.getToken());
|
||||
}
|
||||
}
|
||||
|
||||
@ -577,9 +574,4 @@ private void decResourceRequest(Priority priority,
|
||||
+ " #asks=" + ask.size());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentHashMap<String, Token> getNMTokens() {
|
||||
return nmTokens;
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,6 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -36,6 +35,7 @@
|
||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
@ -54,13 +54,10 @@ public class ContainerManagementProtocolProxy {
|
||||
|
||||
private final int maxConnectedNMs;
|
||||
private final LinkedHashMap<String, ContainerManagementProtocolProxyData> cmProxy;
|
||||
private Map<String, Token> nmTokens;
|
||||
private final Configuration conf;
|
||||
private final YarnRPC rpc;
|
||||
|
||||
public ContainerManagementProtocolProxy(Configuration conf,
|
||||
Map<String, Token> nmTokens) {
|
||||
this.nmTokens = nmTokens;
|
||||
public ContainerManagementProtocolProxy(Configuration conf) {
|
||||
this.conf = conf;
|
||||
|
||||
maxConnectedNMs =
|
||||
@ -86,10 +83,10 @@ public synchronized ContainerManagementProtocolProxyData getProxy(
|
||||
// This get call will update the map which is working as LRU cache.
|
||||
ContainerManagementProtocolProxyData proxy =
|
||||
cmProxy.get(containerManagerBindAddr);
|
||||
|
||||
|
||||
while (proxy != null
|
||||
&& !proxy.token.getIdentifier().equals(
|
||||
nmTokens.get(containerManagerBindAddr).getIdentifier())) {
|
||||
NMTokenCache.getNMToken(containerManagerBindAddr).getIdentifier())) {
|
||||
LOG.info("Refreshing proxy as NMToken got updated for node : "
|
||||
+ containerManagerBindAddr);
|
||||
// Token is updated. check if anyone has already tried closing it.
|
||||
@ -112,7 +109,7 @@ public synchronized ContainerManagementProtocolProxyData getProxy(
|
||||
if (proxy == null) {
|
||||
proxy =
|
||||
new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr,
|
||||
containerId, nmTokens.get(containerManagerBindAddr));
|
||||
containerId, NMTokenCache.getNMToken(containerManagerBindAddr));
|
||||
if (cmProxy.size() > maxConnectedNMs) {
|
||||
// Number of existing proxy exceed the limit.
|
||||
String cmAddr = cmProxy.keySet().iterator().next();
|
||||
@ -172,10 +169,6 @@ public synchronized void stopAllProxies() {
|
||||
cmProxy.clear();
|
||||
}
|
||||
|
||||
public synchronized void setNMTokens(Map<String, Token> nmTokens) {
|
||||
this.nmTokens = nmTokens;
|
||||
}
|
||||
|
||||
public class ContainerManagementProtocolProxyData {
|
||||
private final String containerManagerBindAddr;
|
||||
private final ContainerManagementProtocol proxy;
|
||||
@ -201,10 +194,12 @@ public ContainerManagementProtocolProxyData(YarnRPC rpc,
|
||||
protected ContainerManagementProtocol newProxy(final YarnRPC rpc,
|
||||
String containerManagerBindAddr, ContainerId containerId, Token token)
|
||||
throws InvalidToken {
|
||||
|
||||
if (token == null) {
|
||||
throw new InvalidToken("No NMToken sent for "
|
||||
+ containerManagerBindAddr);
|
||||
}
|
||||
|
||||
final InetSocketAddress cmAddr =
|
||||
NetUtils.createSocketAddr(containerManagerBindAddr);
|
||||
LOG.info("Opening proxy : " + containerManagerBindAddr);
|
||||
|
@ -81,18 +81,15 @@ public class NMClientImpl extends NMClient {
|
||||
new ConcurrentHashMap<ContainerId, StartedContainer>();
|
||||
|
||||
//enabled by default
|
||||
private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
|
||||
private ContainerManagementProtocolProxy cmProxy;
|
||||
private ConcurrentMap<String, Token> nmTokens;
|
||||
private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
|
||||
private ContainerManagementProtocolProxy cmProxy;
|
||||
|
||||
public NMClientImpl(ConcurrentMap<String, Token> nmTokens) {
|
||||
public NMClientImpl() {
|
||||
super(NMClientImpl.class.getName());
|
||||
this.nmTokens = nmTokens;
|
||||
}
|
||||
|
||||
public NMClientImpl(String name, ConcurrentMap<String, Token> nmTokens) {
|
||||
public NMClientImpl(String name) {
|
||||
super(name);
|
||||
this.nmTokens = nmTokens;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -126,8 +123,7 @@ protected synchronized void cleanupRunningContainers() {
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
cmProxy =
|
||||
new ContainerManagementProtocolProxy(conf, nmTokens);
|
||||
cmProxy = new ContainerManagementProtocolProxy(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -26,11 +26,9 @@
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
@ -50,6 +48,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
@ -57,6 +56,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest;
|
||||
@ -488,8 +488,8 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
||||
int iterationsLeft = 2;
|
||||
Set<ContainerId> releases = new TreeSet<ContainerId>();
|
||||
|
||||
ConcurrentHashMap<String, Token> nmTokens = amClient.getNMTokens();
|
||||
Assert.assertEquals(0, nmTokens.size());
|
||||
NMTokenCache.clearCache();
|
||||
Assert.assertEquals(0, NMTokenCache.numberOfNMTokensInCache());
|
||||
HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
|
||||
|
||||
while (allocatedContainerCount < containersRequestedAny
|
||||
@ -505,19 +505,13 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
||||
releases.add(rejectContainerId);
|
||||
amClient.releaseAssignedContainer(rejectContainerId);
|
||||
}
|
||||
Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size());
|
||||
Iterator<String> nodeI = nmTokens.keySet().iterator();
|
||||
while (nodeI.hasNext()) {
|
||||
String nodeId = nodeI.next();
|
||||
if (!receivedNMTokens.containsKey(nodeId)) {
|
||||
receivedNMTokens.put(nodeId, nmTokens.get(nodeId));
|
||||
} else {
|
||||
Assert.fail("Received token again for : " + nodeId);
|
||||
|
||||
for (NMToken token : allocResponse.getNMTokens()) {
|
||||
String nodeID = token.getNodeId().toString();
|
||||
if (receivedNMTokens.containsKey(nodeID)) {
|
||||
Assert.fail("Received token again for : " + nodeID);
|
||||
}
|
||||
}
|
||||
nodeI = receivedNMTokens.keySet().iterator();
|
||||
while (nodeI.hasNext()) {
|
||||
nmTokens.remove(nodeI.next());
|
||||
receivedNMTokens.put(nodeID, token.getToken());
|
||||
}
|
||||
|
||||
if(allocatedContainerCount < containersRequestedAny) {
|
||||
@ -526,7 +520,6 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(0, amClient.getNMTokens().size());
|
||||
// Should receive atleast 1 token
|
||||
Assert.assertTrue(receivedNMTokens.size() > 0
|
||||
&& receivedNMTokens.size() <= nodeCount);
|
||||
|
@ -55,6 +55,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.NMClient;
|
||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
@ -74,7 +75,6 @@ public class TestNMClient {
|
||||
List<NodeReport> nodeReports = null;
|
||||
ApplicationAttemptId attemptId = null;
|
||||
int nodeCount = 3;
|
||||
ConcurrentHashMap<String, Token> nmTokens;
|
||||
|
||||
@Before
|
||||
public void setup() throws YarnException, IOException {
|
||||
@ -136,7 +136,6 @@ public void setup() throws YarnException, IOException {
|
||||
if (iterationsLeft == 0) {
|
||||
fail("Application hasn't bee started");
|
||||
}
|
||||
nmTokens = new ConcurrentHashMap<String, Token>();
|
||||
|
||||
// start am rm client
|
||||
rmClient =
|
||||
@ -148,7 +147,7 @@ public void setup() throws YarnException, IOException {
|
||||
assertEquals(STATE.STARTED, rmClient.getServiceState());
|
||||
|
||||
// start am nm client
|
||||
nmClient = (NMClientImpl) NMClient.createNMClient(nmTokens);
|
||||
nmClient = (NMClientImpl) NMClient.createNMClient();
|
||||
nmClient.init(conf);
|
||||
nmClient.start();
|
||||
assertNotNull(nmClient);
|
||||
@ -173,7 +172,7 @@ private void stopNmClient(boolean stopContainers) {
|
||||
nmClient.stop();
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
@Test (timeout = 180000)
|
||||
public void testNMClientNoCleanupOnStop()
|
||||
throws YarnException, IOException {
|
||||
|
||||
@ -241,7 +240,8 @@ private Set<Container> allocateContainers(
|
||||
}
|
||||
if (!allocResponse.getNMTokens().isEmpty()) {
|
||||
for (NMToken token : allocResponse.getNMTokens()) {
|
||||
nmTokens.put(token.getNodeId().toString(), token.getToken());
|
||||
NMTokenCache.setNMToken(token.getNodeId().toString(),
|
||||
token.getToken());
|
||||
}
|
||||
}
|
||||
if(allocatedContainerCount < containersRequestedAny) {
|
||||
|
Loading…
Reference in New Issue
Block a user