YARN-8110. AMRMProxy recover should catch for all throwable to avoid premature exit. (Botong Huang via Subru).
This commit is contained in:
parent
024d7c0870
commit
00905efab2
@ -261,7 +261,7 @@ public void recover() throws IOException {
|
|||||||
// Create the intercepter pipeline for the AM
|
// Create the intercepter pipeline for the AM
|
||||||
initializePipeline(attemptId, user, amrmToken, localToken,
|
initializePipeline(attemptId, user, amrmToken, localToken,
|
||||||
entry.getValue(), true, amCred);
|
entry.getValue(), true, amCred);
|
||||||
} catch (IOException e) {
|
} catch (Throwable e) {
|
||||||
LOG.error("Exception when recovering " + attemptId
|
LOG.error("Exception when recovering " + attemptId
|
||||||
+ ", removing it from NMStateStore and move on", e);
|
+ ", removing it from NMStateStore and move on", e);
|
||||||
this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId);
|
this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId);
|
||||||
|
@ -112,6 +112,11 @@ protected MockAMRMProxyService getAMRMProxyService() {
|
|||||||
return this.amrmProxyService;
|
return this.amrmProxyService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Context getNMContext() {
|
||||||
|
Assert.assertNotNull(this.nmContext);
|
||||||
|
return this.nmContext;
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws IOException {
|
public void setUp() throws IOException {
|
||||||
this.conf = createConfiguration();
|
this.conf = createConfiguration();
|
||||||
|
@ -44,6 +44,7 @@
|
|||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
|
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
|
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -633,6 +634,35 @@ public void testRecovery() throws YarnException, Exception {
|
|||||||
mockRM = null;
|
mockRM = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test AMRMProxy restart with application recovery failure.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAppRecoveryFailure() throws YarnException, Exception {
|
||||||
|
Configuration conf = createConfiguration();
|
||||||
|
// Use the MockRequestInterceptorAcrossRestart instead for the chain
|
||||||
|
conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
|
||||||
|
BadRequestInterceptorAcrossRestart.class.getName());
|
||||||
|
|
||||||
|
mockRM = new MockResourceManagerFacade(new YarnConfiguration(conf), 0);
|
||||||
|
|
||||||
|
createAndStartAMRMProxyService(conf);
|
||||||
|
|
||||||
|
// Create an app entry in NMSS
|
||||||
|
registerApplicationMaster(1);
|
||||||
|
|
||||||
|
RecoveredAMRMProxyState state =
|
||||||
|
getNMContext().getNMStateStore().loadAMRMProxyState();
|
||||||
|
Assert.assertEquals(1, state.getAppContexts().size());
|
||||||
|
|
||||||
|
// AMRMProxy restarts and recover
|
||||||
|
createAndStartAMRMProxyService(conf);
|
||||||
|
|
||||||
|
state = getNMContext().getNMStateStore().loadAMRMProxyState();
|
||||||
|
// The app that failed to recover should have been removed from NMSS
|
||||||
|
Assert.assertEquals(0, state.getAppContexts().size());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A mock intercepter implementation that uses the same mockRM instance across
|
* A mock intercepter implementation that uses the same mockRM instance across
|
||||||
* restart.
|
* restart.
|
||||||
@ -672,4 +702,16 @@ public AllocateResponse allocate(AllocateRequest request)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A mock intercepter implementation that throws when recovering.
|
||||||
|
*/
|
||||||
|
public static class BadRequestInterceptorAcrossRestart
|
||||||
|
extends MockRequestInterceptorAcrossRestart {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recover(Map<String, byte[]> recoveredDataMap) {
|
||||||
|
throw new RuntimeException("Kaboom");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user