YARN-3877. YarnClientImpl.submitApplication swallows exceptions. Contributed by Varun Saxena
This commit is contained in:
parent
3e37e243ee
commit
e4e72db5f9
@ -306,9 +306,10 @@ public class YarnClientImpl extends YarnClient {
|
||||
try {
|
||||
Thread.sleep(submitPollIntervalMillis);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.error("Interrupted while waiting for application "
|
||||
+ applicationId
|
||||
+ " to be successfully submitted.");
|
||||
String msg = "Interrupted while waiting for application "
|
||||
+ applicationId + " to be successfully submitted.";
|
||||
LOG.error(msg);
|
||||
throw new YarnException(msg, ie);
|
||||
}
|
||||
} catch (ApplicationNotFoundException ex) {
|
||||
// FailOver or RM restart happens before RMStateStore saves
|
||||
@ -446,8 +447,10 @@ public class YarnClientImpl extends YarnClient {
|
||||
Thread.sleep(asyncApiPollIntervalMillis);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Interrupted while waiting for application " + applicationId
|
||||
+ " to be killed.");
|
||||
String msg = "Interrupted while waiting for application "
|
||||
+ applicationId + " to be killed.";
|
||||
LOG.error(msg);
|
||||
throw new YarnException(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.Thread.State;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
@ -201,6 +202,57 @@ public class TestYarnClient {
|
||||
client.stop();
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test (timeout = 20000)
|
||||
public void testSubmitApplicationInterrupted() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
int pollIntervalMs = 1000;
|
||||
conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
|
||||
pollIntervalMs);
|
||||
try (final YarnClient client = new MockYarnClient()) {
|
||||
client.init(conf);
|
||||
client.start();
|
||||
// Submit the application and then interrupt it while its waiting
|
||||
// for submission to be successful.
|
||||
final class SubmitThread extends Thread {
|
||||
private boolean isInterrupted = false;
|
||||
@Override
|
||||
public void run() {
|
||||
ApplicationSubmissionContext context =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
ApplicationId applicationId = ApplicationId.newInstance(
|
||||
System.currentTimeMillis(), 1);
|
||||
when(context.getApplicationId()).thenReturn(applicationId);
|
||||
((MockYarnClient) client).setYarnApplicationState(
|
||||
YarnApplicationState.NEW);
|
||||
try {
|
||||
client.submitApplication(context);
|
||||
} catch (YarnException | IOException e) {
|
||||
if (e instanceof YarnException && e.getCause() != null &&
|
||||
e.getCause() instanceof InterruptedException) {
|
||||
isInterrupted = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
SubmitThread appSubmitThread = new SubmitThread();
|
||||
appSubmitThread.start();
|
||||
try {
|
||||
// Wait for thread to start and begin to sleep
|
||||
// (enter TIMED_WAITING state).
|
||||
while (appSubmitThread.getState() != State.TIMED_WAITING) {
|
||||
Thread.sleep(pollIntervalMs / 2);
|
||||
}
|
||||
// Interrupt the thread.
|
||||
appSubmitThread.interrupt();
|
||||
appSubmitThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
Assert.assertTrue("Expected an InterruptedException wrapped inside a " +
|
||||
"YarnException", appSubmitThread.isInterrupted);
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testSubmitIncorrectQueueToCapacityScheduler() throws IOException {
|
||||
MiniYARNCluster cluster = new MiniYARNCluster("testMRAMTokens", 1, 1, 1);
|
||||
|
Loading…
x
Reference in New Issue
Block a user