MAPREDUCE-2749. Ensure NM registers with RM after starting all its services correctly. Contributed by Thomas Graves.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1169621 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-09-12 07:11:43 +00:00
parent 817ead65b9
commit 6b2f2efe4d
11 changed files with 205 additions and 63 deletions

View File

@ -11,6 +11,7 @@ Trunk (unreleased changes)
process (atm) process (atm)
BUG FIXES BUG FIXES
MAPREDUCE-2784. [Gridmix] Bug fixes in ExecutionSummarizer and MAPREDUCE-2784. [Gridmix] Bug fixes in ExecutionSummarizer and
ResourceUsageMatcher. (amarrk) ResourceUsageMatcher. (amarrk)
@ -1275,6 +1276,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2691. Finishing up the cleanup of distributed cache file resources MAPREDUCE-2691. Finishing up the cleanup of distributed cache file resources
and related tests. (Siddharth Seth via vinodkv) and related tests. (Siddharth Seth via vinodkv)
MAPREDUCE-2749. Ensure NM registers with RM after starting all its services
correctly. (Thomas Graves via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
@ -44,21 +45,24 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
public class NodeManager extends CompositeService { public class NodeManager extends CompositeService {
private static final Log LOG = LogFactory.getLog(NodeManager.class); private static final Log LOG = LogFactory.getLog(NodeManager.class);
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
protected ContainerTokenSecretManager containerTokenSecretManager;
public NodeManager() { public NodeManager() {
super(NodeManager.class.getName()); super(NodeManager.class.getName());
} }
protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
ContainerTokenSecretManager containerTokenSecretManager) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics); metrics, containerTokenSecretManager);
} }
protected NodeResourceMonitor createNodeResourceMonitor() { protected NodeResourceMonitor createNodeResourceMonitor() {
@ -67,9 +71,10 @@ protected NodeResourceMonitor createNodeResourceMonitor() {
protected ContainerManagerImpl createContainerManager(Context context, protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del, ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater) { NodeStatusUpdater nodeStatusUpdater, ContainerTokenSecretManager
containerTokenSecretManager) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics); metrics, containerTokenSecretManager);
} }
protected WebServer createWebServer(Context nmContext, protected WebServer createWebServer(Context nmContext,
@ -87,6 +92,13 @@ public void init(Configuration conf) {
Context context = new NMContext(); Context context = new NMContext();
// Create the secretManager if need be.
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Security is enabled on NodeManager. "
+ "Creating ContainerTokenSecretManager");
this.containerTokenSecretManager = new ContainerTokenSecretManager();
}
ContainerExecutor exec = ReflectionUtils.newInstance( ContainerExecutor exec = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class), conf); DefaultContainerExecutor.class, ContainerExecutor.class), conf);
@ -102,18 +114,16 @@ public void init(Configuration conf) {
addService(healthChecker); addService(healthChecker);
} }
// StatusUpdater should be added first so that it can start first. Once it
// contacts RM, does registration and gets tokens, then only
// ContainerManager can start.
NodeStatusUpdater nodeStatusUpdater = NodeStatusUpdater nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, healthChecker); createNodeStatusUpdater(context, dispatcher, healthChecker,
addService(nodeStatusUpdater); this.containerTokenSecretManager);
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor); addService(nodeResourceMonitor);
ContainerManagerImpl containerManager = ContainerManagerImpl containerManager =
createContainerManager(context, exec, del, nodeStatusUpdater); createContainerManager(context, exec, del, nodeStatusUpdater,
this.containerTokenSecretManager);
addService(containerManager); addService(containerManager);
Service webServer = Service webServer =
@ -132,6 +142,10 @@ public void run() {
DefaultMetricsSystem.initialize("NodeManager"); DefaultMetricsSystem.initialize("NodeManager");
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);
super.init(conf); super.init(conf);
// TODO add local dirs to del // TODO add local dirs to del
} }

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -68,6 +69,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private final Context context; private final Context context;
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
private ContainerTokenSecretManager containerTokenSecretManager;
private long heartBeatInterval; private long heartBeatInterval;
private ResourceTracker resourceTracker; private ResourceTracker resourceTracker;
private String rmAddress; private String rmAddress;
@ -85,12 +87,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private final NodeManagerMetrics metrics; private final NodeManagerMetrics metrics;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
ContainerTokenSecretManager containerTokenSecretManager) {
super(NodeStatusUpdaterImpl.class.getName()); super(NodeStatusUpdaterImpl.class.getName());
this.healthChecker = healthChecker; this.healthChecker = healthChecker;
this.context = context; this.context = context;
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
this.metrics = metrics; this.metrics = metrics;
this.containerTokenSecretManager = containerTokenSecretManager;
} }
@Override @Override
@ -173,8 +177,18 @@ private void registerWithRM() throws YarnRemoteException {
this.secretKeyBytes = regResponse.getSecretKey().array(); this.secretKeyBytes = regResponse.getSecretKey().array();
} }
// do this now so that its set before we start heartbeating to RM
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Security enabled - updating secret keys now");
// It is expected that status updater is started by this point and
// RM gives the shared secret in registration during StatusUpdater#start().
this.containerTokenSecretManager.setSecretKey(
this.getContainerManagerBindAddress(),
this.getRMNMSharedSecret());
}
LOG.info("Registered with ResourceManager as " + this.containerManagerBindAddress LOG.info("Registered with ResourceManager as " + this.containerManagerBindAddress
+ " with total resource of " + this.totalResource); + " with total resource of " + this.totalResource);
} }
@Override @Override

View File

@ -116,7 +116,8 @@ public class ContainerManagerImpl extends CompositeService implements
public ContainerManagerImpl(Context context, ContainerExecutor exec, public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics) { NodeManagerMetrics metrics, ContainerTokenSecretManager
containerTokenSecretManager) {
super(ContainerManagerImpl.class.getName()); super(ContainerManagerImpl.class.getName());
this.context = context; this.context = context;
dispatcher = new AsyncDispatcher(); dispatcher = new AsyncDispatcher();
@ -131,12 +132,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
addService(containersLauncher); addService(containersLauncher);
this.nodeStatusUpdater = nodeStatusUpdater; this.nodeStatusUpdater = nodeStatusUpdater;
// Create the secretManager if need be. this.containerTokenSecretManager = containerTokenSecretManager;
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Security is enabled on NodeManager. "
+ "Creating ContainerTokenSecretManager");
this.containerTokenSecretManager = new ContainerTokenSecretManager();
}
// Start configurable services // Start configurable services
auxiluaryServices = new AuxServices(); auxiluaryServices = new AuxServices();
@ -196,14 +192,6 @@ public void start() {
// Enqueue user dirs in deletion context // Enqueue user dirs in deletion context
YarnRPC rpc = YarnRPC.create(getConfig()); YarnRPC rpc = YarnRPC.create(getConfig());
if (UserGroupInformation.isSecurityEnabled()) {
// This is fine as status updater is started before ContainerManager and
// RM gives the shared secret in registration during StatusUpdter#start()
// itself.
this.containerTokenSecretManager.setSecretKey(
this.nodeStatusUpdater.getContainerManagerBindAddress(),
this.nodeStatusUpdater.getRMNMSharedSecret());
}
Configuration cmConf = new Configuration(getConfig()); Configuration cmConf = new Configuration(getConfig());
cmConf.setClass(YarnConfiguration.YARN_SECURITY_INFO, cmConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
ContainerManagerSecurityInfo.class, SecurityInfo.class); ContainerManagerSecurityInfo.class, SecurityInfo.class);

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@ -59,8 +60,8 @@ public class DummyContainerManager extends ContainerManagerImpl {
public DummyContainerManager(Context context, ContainerExecutor exec, public DummyContainerManager(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics) { NodeManagerMetrics metrics, ContainerTokenSecretManager containerTokenSecretManager) {
super(context, exec, deletionContext, nodeStatusUpdater, metrics); super(context, exec, deletionContext, nodeStatusUpdater, metrics, containerTokenSecretManager);
} }
@Override @Override

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
@ -84,8 +85,9 @@ public void testSuccessfulContainerLaunch() throws InterruptedException,
Dispatcher dispatcher = new AsyncDispatcher(); Dispatcher dispatcher = new AsyncDispatcher();
NodeHealthCheckerService healthChecker = null; NodeHealthCheckerService healthChecker = null;
NodeManagerMetrics metrics = NodeManagerMetrics.create(); NodeManagerMetrics metrics = NodeManagerMetrics.create();
ContainerTokenSecretManager containerTokenSecretManager = new ContainerTokenSecretManager();
NodeStatusUpdater nodeStatusUpdater = NodeStatusUpdater nodeStatusUpdater =
new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics) { new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics, containerTokenSecretManager) {
@Override @Override
protected ResourceTracker getRMClient() { protected ResourceTracker getRMClient() {
return new LocalRMInterface(); return new LocalRMInterface();
@ -98,7 +100,7 @@ protected void startStatusUpdater() {
}; };
DummyContainerManager containerManager = DummyContainerManager containerManager =
new DummyContainerManager(context, exec, del, nodeStatusUpdater, metrics); new DummyContainerManager(context, exec, del, nodeStatusUpdater, metrics, containerTokenSecretManager);
containerManager.init(conf); containerManager.init(conf);
containerManager.start(); containerManager.start();

View File

@ -32,6 +32,8 @@
import org.apache.hadoop.NodeHealthCheckerService; import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -52,9 +54,12 @@
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.Service.STATE; import org.apache.hadoop.yarn.service.Service.STATE;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -63,24 +68,38 @@
public class TestNodeStatusUpdater { public class TestNodeStatusUpdater {
// temp fix until metrics system can auto-detect itself running in unit test:
static {
DefaultMetricsSystem.setMiniClusterMode(true);
}
static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class); static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class);
static final Path basedir = static final Path basedir =
new Path("target", TestNodeStatusUpdater.class.getName()); new Path("target", TestNodeStatusUpdater.class.getName());
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
int heartBeatID = 0; int heartBeatID = 0;
volatile Error nmStartError = null; volatile Error nmStartError = null;
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
@After
public void tearDown() {
this.registeredNodes.clear();
DefaultMetricsSystem.shutdown();
}
private class MyResourceTracker implements ResourceTracker { private class MyResourceTracker implements ResourceTracker {
private Context context; private final Context context;
public MyResourceTracker(Context context) { public MyResourceTracker(Context context) {
this.context = context; this.context = context;
} }
@Override @Override
public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException { public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException {
NodeId nodeId = request.getNodeId(); NodeId nodeId = request.getNodeId();
Resource resource = request.getResource(); Resource resource = request.getResource();
LOG.info("Registering " + nodeId.toString()); LOG.info("Registering " + nodeId.toString());
@ -91,17 +110,24 @@ public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerReques
Assert.fail(e.getMessage()); Assert.fail(e.getMessage());
} }
Assert.assertEquals(5 * 1024, resource.getMemory()); Assert.assertEquals(5 * 1024, resource.getMemory());
RegistrationResponse regResponse = recordFactory.newRecordInstance(RegistrationResponse.class); registeredNodes.add(nodeId);
RegistrationResponse regResponse = recordFactory
.newRecordInstance(RegistrationResponse.class);
RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
response.setRegistrationResponse(regResponse); response.setRegistrationResponse(regResponse);
return response; return response;
} }
ApplicationId applicationID = recordFactory.newRecordInstance(ApplicationId.class); ApplicationId applicationID = recordFactory
ApplicationAttemptId appAttemptID = recordFactory.newRecordInstance(ApplicationAttemptId.class); .newRecordInstance(ApplicationId.class);
ContainerId firstContainerID = recordFactory.newRecordInstance(ContainerId.class); ApplicationAttemptId appAttemptID = recordFactory
ContainerId secondContainerID = recordFactory.newRecordInstance(ContainerId.class); .newRecordInstance(ApplicationAttemptId.class);
ContainerId firstContainerID = recordFactory
.newRecordInstance(ContainerId.class);
ContainerId secondContainerID = recordFactory
.newRecordInstance(ContainerId.class);
private Map<ApplicationId, List<ContainerStatus>> getAppToContainerStatusMap( private Map<ApplicationId, List<ContainerStatus>> getAppToContainerStatusMap(
List<ContainerStatus> containers) { List<ContainerStatus> containers) {
@ -118,8 +144,10 @@ private Map<ApplicationId, List<ContainerStatus>> getAppToContainerStatusMap(
} }
return map; return map;
} }
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException { public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException {
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
LOG.info("Got heartbeat number " + heartBeatID); LOG.info("Got heartbeat number " + heartBeatID);
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID++);
@ -134,7 +162,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws
firstContainerID.setAppId(applicationID); firstContainerID.setAppId(applicationID);
firstContainerID.setAppAttemptId(appAttemptID); firstContainerID.setAppAttemptId(appAttemptID);
firstContainerID.setId(heartBeatID); firstContainerID.setId(heartBeatID);
ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); ContainerLaunchContext launchContext = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
launchContext.setContainerId(firstContainerID); launchContext.setContainerId(firstContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class)); launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(2); launchContext.getResource().setMemory(2);
@ -158,7 +187,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws
secondContainerID.setAppId(applicationID); secondContainerID.setAppId(applicationID);
secondContainerID.setAppAttemptId(appAttemptID); secondContainerID.setAppAttemptId(appAttemptID);
secondContainerID.setId(heartBeatID); secondContainerID.setId(heartBeatID);
ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); ContainerLaunchContext launchContext = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
launchContext.setContainerId(secondContainerID); launchContext.setContainerId(secondContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class)); launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(3); launchContext.getResource().setMemory(3);
@ -176,10 +206,12 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws
this.context.getContainers(); this.context.getContainers();
Assert.assertEquals(2, activeContainers.size()); Assert.assertEquals(2, activeContainers.size());
} }
HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class); HeartbeatResponse response = recordFactory
.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID); response.setResponseId(heartBeatID);
NodeHeartbeatResponse nhResponse = recordFactory.newRecordInstance(NodeHeartbeatResponse.class); NodeHeartbeatResponse nhResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response); nhResponse.setHeartbeatResponse(response);
return nhResponse; return nhResponse;
} }
@ -189,8 +221,10 @@ private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
private Context context; private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
super(context, dispatcher, healthChecker, metrics); ContainerTokenSecretManager containerTokenSecretManager) {
super(context, dispatcher, healthChecker, metrics,
containerTokenSecretManager);
this.context = context; this.context = context;
} }
@ -216,21 +250,23 @@ public void testNMRegistration() throws InterruptedException {
final NodeManager nm = new NodeManager() { final NodeManager nm = new NodeManager() {
@Override @Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
ContainerTokenSecretManager containerTokenSecretManager) {
return new MyNodeStatusUpdater(context, dispatcher, healthChecker, return new MyNodeStatusUpdater(context, dispatcher, healthChecker,
metrics); metrics, containerTokenSecretManager);
} }
}; };
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = createNMConfig();
conf.setInt(YarnConfiguration.NM_VMEM_GB, 5); // 5GB
conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri().getPath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir, "remotelogs")
.toUri().getPath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0").toUri().getPath());
nm.init(conf); nm.init(conf);
// verify that the last service is the nodeStatusUpdater (ie registration
// with RM)
Object[] services = nm.getServices().toArray();
Object lastService = services[services.length-1];
Assert.assertTrue("last service is NOT the node status updater",
lastService instanceof NodeStatusUpdater);
new Thread() { new Thread() {
public void run() { public void run() {
try { try {
@ -260,7 +296,75 @@ public void run() {
while (heartBeatID <= 3) { while (heartBeatID <= 3) {
Thread.sleep(500); Thread.sleep(500);
} }
Assert.assertEquals("Number of registered NMs is wrong!!", 1,
this.registeredNodes.size());
nm.stop(); nm.stop();
} }
/**
* Verifies that if for some reason NM fails to start ContainerManager RPC
* server, RM is oblivious to NM's presence. The behaviour is like this
* because otherwise, NM will report to RM even if all its servers are not
* started properly, RM will think that the NM is alive and will retire the NM
* only after NM_EXPIRY interval. See MAPREDUCE-2749.
*/
@Test
public void testNoRegistrationWhenNMServicesFail() {
final NodeManager nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
ContainerTokenSecretManager containerTokenSecretManager) {
return new MyNodeStatusUpdater(context, dispatcher, healthChecker,
metrics, containerTokenSecretManager);
}
@Override
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
ContainerTokenSecretManager containerTokenSecretManager) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, containerTokenSecretManager) {
@Override
public void start() {
// Simulating failure of starting RPC server
throw new YarnException("Starting of RPC Server failed");
}
};
}
};
YarnConfiguration conf = createNMConfig();
nm.init(conf);
try {
nm.start();
Assert.fail("NM should have failed to start. Didn't get exception!!");
} catch (Exception e) {
Assert.assertEquals("Starting of RPC Server failed", e.getCause()
.getMessage());
}
Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm
.getServiceState());
Assert.assertEquals("Number of registered nodes is wrong!", 0,
this.registeredNodes.size());
}
private YarnConfiguration createNMConfig() {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.NM_VMEM_GB, 5); // 5GB
conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri()
.getPath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir,
"remotelogs").toUri().getPath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0")
.toUri().getPath());
return conf;
}
} }

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -67,6 +68,7 @@ public abstract class BaseContainerManagerTest {
protected static File localLogDir; protected static File localLogDir;
protected static File remoteLogDir; protected static File remoteLogDir;
protected static File tmpDir; protected static File tmpDir;
protected ContainerTokenSecretManager containerTokenSecretManager = new ContainerTokenSecretManager();
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
@ -94,7 +96,7 @@ public BaseContainerManagerTest() throws UnsupportedFileSystemException {
protected String user = "nobody"; protected String user = "nobody";
protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl( protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
context, new AsyncDispatcher(), null, metrics) { context, new AsyncDispatcher(), null, metrics, this.containerTokenSecretManager) {
@Override @Override
protected ResourceTracker getRMClient() { protected ResourceTracker getRMClient() {
return new LocalRMInterface(); return new LocalRMInterface();
@ -147,7 +149,7 @@ public void delete(String user, Path subDir, Path[] baseDirs) {
exec = createContainerExecutor(); exec = createContainerExecutor();
containerManager = containerManager =
new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
metrics); metrics, this.containerTokenSecretManager);
containerManager.init(conf); containerManager.init(conf);
} }

View File

@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test; import org.junit.Test;
@ -281,8 +282,10 @@ public void testLocalFilesCleanup() throws InterruptedException,
// Real del service // Real del service
delSrvc = new DeletionService(exec); delSrvc = new DeletionService(exec);
delSrvc.init(conf); delSrvc.init(conf);
ContainerTokenSecretManager containerTokenSecretManager = new
ContainerTokenSecretManager();
containerManager = new ContainerManagerImpl(context, exec, delSrvc, containerManager = new ContainerManagerImpl(context, exec, delSrvc,
nodeStatusUpdater, metrics); nodeStatusUpdater, metrics, containerTokenSecretManager);
containerManager.init(conf); containerManager.init(conf);
containerManager.start(); containerManager.start();

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.CompositeService;
@ -177,9 +178,10 @@ protected void doSecureLogin() throws IOException {
@Override @Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
ContainerTokenSecretManager containerTokenSecretManager) {
return new NodeStatusUpdaterImpl(context, dispatcher, return new NodeStatusUpdaterImpl(context, dispatcher,
healthChecker, metrics) { healthChecker, metrics, containerTokenSecretManager) {
@Override @Override
protected ResourceTracker getRMClient() { protected ResourceTracker getRMClient() {
final ResourceTrackerService rt = resourceManager final ResourceTrackerService rt = resourceManager

View File

@ -82,6 +82,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Test; import org.junit.Test;
public class TestContainerTokenSecretManager { public class TestContainerTokenSecretManager {
@ -94,6 +95,7 @@ public class TestContainerTokenSecretManager {
private static final File localDir = new File("target", private static final File localDir = new File("target",
TestContainerTokenSecretManager.class.getName() + "-localDir") TestContainerTokenSecretManager.class.getName() + "-localDir")
.getAbsoluteFile(); .getAbsoluteFile();
private static MiniYARNCluster yarnCluster;
@BeforeClass @BeforeClass
public static void setup() throws AccessControlException, public static void setup() throws AccessControlException,
@ -103,6 +105,12 @@ public static void setup() throws AccessControlException,
localDir.mkdir(); localDir.mkdir();
} }
@AfterClass
public static void teardown() {
yarnCluster.stop();
}
@Test @Test
public void test() throws IOException, InterruptedException { public void test() throws IOException, InterruptedException {
@ -116,7 +124,7 @@ public void test() throws IOException, InterruptedException {
// Set AM expiry interval to be very long. // Set AM expiry interval to be very long.
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L); conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L);
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
MiniYARNCluster yarnCluster = yarnCluster =
new MiniYARNCluster(TestContainerTokenSecretManager.class.getName()); new MiniYARNCluster(TestContainerTokenSecretManager.class.getName());
yarnCluster.init(conf); yarnCluster.init(conf);
yarnCluster.start(); yarnCluster.start();