MAPREDUCE-5769. Unregistration to RM should not be called if AM is crashed before registering with RM. Contributed by Rohith
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1577647 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2a6e8610b1
commit
d9f723ed74
@ -237,6 +237,9 @@ Release 2.4.0 - UNRELEASED
|
||||
MAPREDUCE-5688. TestStagingCleanup fails intermittently with JDK7 (Mit
|
||||
Desai via jeagles)
|
||||
|
||||
MAPREDUCE-5769. Unregistration to RM should not be called if AM is crashed
|
||||
before registering with RM (Rohith via jlowe)
|
||||
|
||||
Release 2.3.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -87,6 +87,7 @@ public abstract class RMCommunicator extends AbstractService
|
||||
// Has a signal (SIGTERM etc) been issued?
|
||||
protected volatile boolean isSignalled = false;
|
||||
private volatile boolean shouldUnregister = true;
|
||||
private boolean isApplicationMasterRegistered = false;
|
||||
|
||||
public RMCommunicator(ClientService clientService, AppContext context) {
|
||||
super("RMCommunicator");
|
||||
@ -153,6 +154,7 @@ protected void register() {
|
||||
}
|
||||
RegisterApplicationMasterResponse response =
|
||||
scheduler.registerApplicationMaster(request);
|
||||
isApplicationMasterRegistered = true;
|
||||
maxContainerCapability = response.getMaximumResourceCapability();
|
||||
this.context.getClusterInfo().setMaxContainerCapability(
|
||||
maxContainerCapability);
|
||||
@ -249,7 +251,7 @@ protected void serviceStop() throws Exception {
|
||||
LOG.warn("InterruptedException while stopping", ie);
|
||||
}
|
||||
}
|
||||
if(shouldUnregister) {
|
||||
if (isApplicationMasterRegistered && shouldUnregister) {
|
||||
unregister();
|
||||
}
|
||||
super.serviceStop();
|
||||
@ -328,4 +330,9 @@ public void setSignalled(boolean isSignalled) {
|
||||
LOG.info("RMCommunicator notified that iSignalled is: "
|
||||
+ isSignalled);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected boolean isApplicationMasterRegistered() {
|
||||
return isApplicationMasterRegistered;
|
||||
}
|
||||
}
|
||||
|
@ -1386,7 +1386,7 @@ private static class MyContainerAllocator extends RMContainerAllocator {
|
||||
static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents
|
||||
= new ArrayList<JobUpdatedNodesEvent>();
|
||||
private MyResourceManager rm;
|
||||
|
||||
private boolean isUnregistered = false;
|
||||
private static AppContext createAppContext(
|
||||
ApplicationAttemptId appAttemptId, Job job) {
|
||||
AppContext context = mock(AppContext.class);
|
||||
@ -1478,6 +1478,7 @@ protected void register() {
|
||||
|
||||
@Override
|
||||
protected void unregister() {
|
||||
isUnregistered=true;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1527,6 +1528,15 @@ static List<JobUpdatedNodesEvent> getJobUpdatedNodeEvents() {
|
||||
protected void startAllocatorThread() {
|
||||
// override to NOT start thread
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isApplicationMasterRegistered() {
|
||||
return super.isApplicationMasterRegistered();
|
||||
}
|
||||
|
||||
public boolean isUnregistered() {
|
||||
return isUnregistered;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1776,6 +1786,51 @@ public void testCompletedContainerEvent() {
|
||||
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnregistrationOnlyIfRegistered() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
final MyResourceManager rm = new MyResourceManager(conf);
|
||||
rm.start();
|
||||
DrainDispatcher rmDispatcher =
|
||||
(DrainDispatcher) rm.getRMContext().getDispatcher();
|
||||
|
||||
// Submit the application
|
||||
RMApp rmApp = rm.submitApp(1024);
|
||||
rmDispatcher.await();
|
||||
|
||||
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 11264);
|
||||
amNodeManager.nodeHeartbeat(true);
|
||||
rmDispatcher.await();
|
||||
|
||||
final ApplicationAttemptId appAttemptId =
|
||||
rmApp.getCurrentAppAttempt().getAppAttemptId();
|
||||
rm.sendAMLaunched(appAttemptId);
|
||||
rmDispatcher.await();
|
||||
|
||||
MRApp mrApp =
|
||||
new MRApp(appAttemptId, ContainerId.newInstance(appAttemptId, 0), 10,
|
||||
0, false, this.getClass().getName(), true, 1) {
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return new DrainDispatcher();
|
||||
}
|
||||
|
||||
protected ContainerAllocator createContainerAllocator(
|
||||
ClientService clientService, AppContext context) {
|
||||
return new MyContainerAllocator(rm, appAttemptId, context);
|
||||
};
|
||||
};
|
||||
|
||||
mrApp.submit(conf);
|
||||
DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
|
||||
MyContainerAllocator allocator =
|
||||
(MyContainerAllocator) mrApp.getContainerAllocator();
|
||||
amDispatcher.await();
|
||||
Assert.assertTrue(allocator.isApplicationMasterRegistered());
|
||||
mrApp.stop();
|
||||
Assert.assertTrue(allocator.isUnregistered());
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
TestRMContainerAllocator t = new TestRMContainerAllocator();
|
||||
t.testSimple();
|
||||
|
Loading…
Reference in New Issue
Block a user