YARN-5999. AMRMClientAsync will stop if any exceptions thrown on allocate call. Contributed by Jian He
This commit is contained in:
parent
f5e0bd30fd
commit
64a2d5be91
@ -61,7 +61,7 @@ public class AMRMClientAsyncImpl<T extends ContainerRequest>
|
|||||||
private final HeartbeatThread heartbeatThread;
|
private final HeartbeatThread heartbeatThread;
|
||||||
private final CallbackHandlerThread handlerThread;
|
private final CallbackHandlerThread handlerThread;
|
||||||
|
|
||||||
private final BlockingQueue<AllocateResponse> responseQueue;
|
private final BlockingQueue<Object> responseQueue;
|
||||||
|
|
||||||
private final Object unregisterHeartbeatLock = new Object();
|
private final Object unregisterHeartbeatLock = new Object();
|
||||||
|
|
||||||
@ -70,8 +70,6 @@ public class AMRMClientAsyncImpl<T extends ContainerRequest>
|
|||||||
|
|
||||||
private volatile String collectorAddr;
|
private volatile String collectorAddr;
|
||||||
|
|
||||||
private volatile Throwable savedException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param intervalMs heartbeat interval in milliseconds between AM and RM
|
* @param intervalMs heartbeat interval in milliseconds between AM and RM
|
||||||
@ -90,7 +88,6 @@ public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs,
|
|||||||
handlerThread = new CallbackHandlerThread();
|
handlerThread = new CallbackHandlerThread();
|
||||||
responseQueue = new LinkedBlockingQueue<>();
|
responseQueue = new LinkedBlockingQueue<>();
|
||||||
keepRunning = true;
|
keepRunning = true;
|
||||||
savedException = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -111,9 +108,8 @@ public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs,
|
|||||||
super(client, intervalMs, callbackHandler);
|
super(client, intervalMs, callbackHandler);
|
||||||
heartbeatThread = new HeartbeatThread();
|
heartbeatThread = new HeartbeatThread();
|
||||||
handlerThread = new CallbackHandlerThread();
|
handlerThread = new CallbackHandlerThread();
|
||||||
responseQueue = new LinkedBlockingQueue<AllocateResponse>();
|
responseQueue = new LinkedBlockingQueue<Object>();
|
||||||
keepRunning = true;
|
keepRunning = true;
|
||||||
savedException = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -265,7 +261,7 @@ public HeartbeatThread() {
|
|||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
while (true) {
|
while (true) {
|
||||||
AllocateResponse response = null;
|
Object response = null;
|
||||||
// synchronization ensures we don't send heartbeats after unregistering
|
// synchronization ensures we don't send heartbeats after unregistering
|
||||||
synchronized (unregisterHeartbeatLock) {
|
synchronized (unregisterHeartbeatLock) {
|
||||||
if (!keepRunning) {
|
if (!keepRunning) {
|
||||||
@ -280,10 +276,7 @@ public void run() {
|
|||||||
return;
|
return;
|
||||||
} catch (Throwable ex) {
|
} catch (Throwable ex) {
|
||||||
LOG.error("Exception on heartbeat", ex);
|
LOG.error("Exception on heartbeat", ex);
|
||||||
savedException = ex;
|
response = ex;
|
||||||
// interrupt handler thread in case it waiting on the queue
|
|
||||||
handlerThread.interrupt();
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
if (response != null) {
|
if (response != null) {
|
||||||
while (true) {
|
while (true) {
|
||||||
@ -316,19 +309,20 @@ public void run() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
AllocateResponse response;
|
Object object;
|
||||||
if(savedException != null) {
|
|
||||||
LOG.error("Stopping callback due to: ", savedException);
|
|
||||||
handler.onError(savedException);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
response = responseQueue.take();
|
object = responseQueue.take();
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
LOG.info("Interrupted while waiting for queue", ex);
|
LOG.info("Interrupted while waiting for queue", ex);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (object instanceof Throwable) {
|
||||||
|
progress = handler.getProgress();
|
||||||
|
handler.onError((Throwable) object);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
AllocateResponse response = (AllocateResponse) object;
|
||||||
String collectorAddress = response.getCollectorAddr();
|
String collectorAddress = response.getCollectorAddr();
|
||||||
TimelineClient timelineClient = client.getRegisteredTimelineClient();
|
TimelineClient timelineClient = client.getRegisteredTimelineClient();
|
||||||
if (timelineClient != null && collectorAddress != null
|
if (timelineClient != null && collectorAddress != null
|
||||||
|
@ -213,7 +213,7 @@ private void runHeartBeatThrowOutException(Exception ex) throws Exception{
|
|||||||
|
|
||||||
asyncClient.stop();
|
asyncClient.stop();
|
||||||
// stopping should have joined all threads and completed all callbacks
|
// stopping should have joined all threads and completed all callbacks
|
||||||
Assert.assertTrue(callbackHandler.callbackCount == 0);
|
Assert.assertTrue(callbackHandler.callbackCount > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 10000)
|
@Test (timeout = 10000)
|
||||||
|
Loading…
Reference in New Issue
Block a user