diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index bc6cadd1ed..3dd53d3554 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -61,7 +61,7 @@ public class AMRMClientAsyncImpl private final HeartbeatThread heartbeatThread; private final CallbackHandlerThread handlerThread; - private final BlockingQueue responseQueue; + private final BlockingQueue responseQueue; private final Object unregisterHeartbeatLock = new Object(); @@ -70,8 +70,6 @@ public class AMRMClientAsyncImpl private volatile String collectorAddr; - private volatile Throwable savedException; - /** * * @param intervalMs heartbeat interval in milliseconds between AM and RM @@ -90,7 +88,6 @@ public AMRMClientAsyncImpl(AMRMClient client, int intervalMs, handlerThread = new CallbackHandlerThread(); responseQueue = new LinkedBlockingQueue<>(); keepRunning = true; - savedException = null; } /** @@ -111,9 +108,8 @@ public AMRMClientAsyncImpl(AMRMClient client, int intervalMs, super(client, intervalMs, callbackHandler); heartbeatThread = new HeartbeatThread(); handlerThread = new CallbackHandlerThread(); - responseQueue = new LinkedBlockingQueue(); + responseQueue = new LinkedBlockingQueue(); keepRunning = true; - savedException = null; } @Override @@ -265,7 +261,7 @@ public HeartbeatThread() { public void run() { while (true) { - AllocateResponse response = null; + Object response = null; // synchronization ensures we don't send heartbeats after unregistering synchronized (unregisterHeartbeatLock) { if (!keepRunning) { @@ -280,10 +276,7 @@ public void run() { return; } catch (Throwable ex) { LOG.error("Exception on heartbeat", ex); - savedException = ex; - // interrupt handler thread in case it waiting on the queue - handlerThread.interrupt(); - return; + response = ex; } if (response != null) { while (true) { @@ -316,19 +309,20 @@ public void run() { return; } try { - AllocateResponse response; - if(savedException != null) { - LOG.error("Stopping callback due to: ", savedException); - handler.onError(savedException); - return; - } + Object object; try { - response = responseQueue.take(); + object = responseQueue.take(); } catch (InterruptedException ex) { LOG.info("Interrupted while waiting for queue", ex); continue; } + if (object instanceof Throwable) { + progress = handler.getProgress(); + handler.onError((Throwable) object); + continue; + } + AllocateResponse response = (AllocateResponse) object; String collectorAddress = response.getCollectorAddr(); TimelineClient timelineClient = client.getRegisteredTimelineClient(); if (timelineClient != null && collectorAddress != null diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index dac82e4025..ba38340975 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -213,7 +213,7 @@ private void runHeartBeatThrowOutException(Exception ex) throws Exception{ asyncClient.stop(); // stopping should have joined all threads and completed all callbacks - Assert.assertTrue(callbackHandler.callbackCount == 0); + Assert.assertTrue(callbackHandler.callbackCount > 0); } @Test (timeout = 10000)