YARN-994. HeartBeat thread in AMRMClientAsync does not handle runtime exception correctly (Xuan Gong via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1510070 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-08-03 22:31:50 +00:00
parent ac933234ac
commit 41e2518e0c
4 changed files with 27 additions and 16 deletions

View File

@ -60,6 +60,9 @@ Release 2.1.1-beta - UNRELEASED
YARN-906. Fixed a bug in NodeManager where cancelling ContainerLaunch at YARN-906. Fixed a bug in NodeManager where cancelling ContainerLaunch at
KILLING state causes that the container to hang. (Zhijie Shen via vinodkv) KILLING state causes that the container to hang. (Zhijie Shen via vinodkv)
YARN-994. HeartBeat thread in AMRMClientAsync does not handle runtime
exception correctly (Xuan Gong via bikas)
Release 2.1.0-beta - 2013-08-06 Release 2.1.0-beta - 2013-08-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -65,7 +65,7 @@ public class AMRMClientAsyncImpl<T extends ContainerRequest>
private volatile boolean keepRunning; private volatile boolean keepRunning;
private volatile float progress; private volatile float progress;
private volatile Exception savedException; private volatile Throwable savedException;
public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) { public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) {
this(new AMRMClientImpl<T>(), intervalMs, callbackHandler); this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
@ -222,18 +222,12 @@ public void run() {
try { try {
response = client.allocate(progress); response = client.allocate(progress);
} catch (YarnException ex) { } catch (Throwable ex) {
LOG.error("Yarn exception on heartbeat", ex); LOG.error("Exception on heartbeat", ex);
savedException = ex; savedException = ex;
// interrupt handler thread in case it waiting on the queue // interrupt handler thread in case it waiting on the queue
handlerThread.interrupt(); handlerThread.interrupt();
return; return;
} catch (IOException e) {
LOG.error("IO exception on heartbeat", e);
savedException = e;
// interrupt handler thread in case it waiting on the queue
handlerThread.interrupt();
return;
} }
} }
if (response != null) { if (response != null) {

View File

@ -277,6 +277,8 @@ protected void populateNMTokens(AllocateResponse allocateResponse) {
public void unregisterApplicationMaster(FinalApplicationStatus appStatus, public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage, String appTrackingUrl) throws YarnException, String appMessage, String appTrackingUrl) throws YarnException,
IOException { IOException {
Preconditions.checkArgument(appStatus != null,
"AppStatus should not be null.");
FinishApplicationMasterRequest request = FinishApplicationMasterRequest request =
FinishApplicationMasterRequest.newInstance(appStatus, appMessage, FinishApplicationMasterRequest.newInstance(appStatus, appMessage,
appTrackingUrl); appTrackingUrl);

View File

@ -159,14 +159,26 @@ public Resource answer(InvocationOnMock invocation)
@Test(timeout=10000) @Test(timeout=10000)
public void testAMRMClientAsyncException() throws Exception { public void testAMRMClientAsyncException() throws Exception {
String exStr = "TestException";
YarnException mockException = mock(YarnException.class);
when(mockException.getMessage()).thenReturn(exStr);
runHeartBeatThrowOutException(mockException);
}
@Test(timeout=10000)
public void testAMRMClientAsyncRunTimeException() throws Exception {
String exStr = "TestRunTimeException";
RuntimeException mockRunTimeException = mock(RuntimeException.class);
when(mockRunTimeException.getMessage()).thenReturn(exStr);
runHeartBeatThrowOutException(mockRunTimeException);
}
private void runHeartBeatThrowOutException(Exception ex) throws Exception{
Configuration conf = new Configuration(); Configuration conf = new Configuration();
TestCallbackHandler callbackHandler = new TestCallbackHandler(); TestCallbackHandler callbackHandler = new TestCallbackHandler();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class); AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
String exStr = "TestException"; when(client.allocate(anyFloat())).thenThrow(ex);
YarnException mockException = mock(YarnException.class);
when(mockException.getMessage()).thenReturn(exStr);
when(client.allocate(anyFloat())).thenThrow(mockException);
AMRMClientAsync<ContainerRequest> asyncClient = AMRMClientAsync<ContainerRequest> asyncClient =
AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
@ -183,14 +195,14 @@ public void testAMRMClientAsyncException() throws Exception {
} }
} }
} }
Assert.assertTrue(callbackHandler.savedException.getMessage().contains(
Assert.assertTrue(callbackHandler.savedException.getMessage().contains(exStr)); ex.getMessage()));
asyncClient.stop(); asyncClient.stop();
// stopping should have joined all threads and completed all callbacks // stopping should have joined all threads and completed all callbacks
Assert.assertTrue(callbackHandler.callbackCount == 0); Assert.assertTrue(callbackHandler.callbackCount == 0);
} }
@Test//(timeout=10000) @Test//(timeout=10000)
public void testAMRMClientAsyncReboot() throws Exception { public void testAMRMClientAsyncReboot() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();