MAPREDUCE-7262. MRApp helpers block for long intervals (500ms)

Signed-off-by: Jonathan Eagles <jeagles@gmail.com>
This commit is contained in:
Ahmed Hussein 2020-01-27 15:50:13 -06:00 committed by Jonathan Eagles
parent 7f40e6688a
commit 3f01c48106

View File

@ -113,6 +113,8 @@
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class MRApp extends MRAppMaster { public class MRApp extends MRAppMaster {
private static final Logger LOG = LoggerFactory.getLogger(MRApp.class); private static final Logger LOG = LoggerFactory.getLogger(MRApp.class);
private static final int WAIT_FOR_STATE_CNT = 200;
private static final int WAIT_FOR_STATE_INTERVAL= 50;
/** /**
* The available resource of each container allocated. * The available resource of each container allocated.
@ -322,13 +324,11 @@ public void waitForInternalState(JobImpl job,
JobStateInternal finalState) throws Exception { JobStateInternal finalState) throws Exception {
int timeoutSecs = 0; int timeoutSecs = 0;
JobStateInternal iState = job.getInternalState(); JobStateInternal iState = job.getInternalState();
while (!finalState.equals(iState) && timeoutSecs++ < 20) { while (!finalState.equals(iState) && timeoutSecs++ < WAIT_FOR_STATE_CNT) {
System.out.println("Job Internal State is : " + iState Thread.sleep(WAIT_FOR_STATE_INTERVAL);
+ " Waiting for Internal state : " + finalState);
Thread.sleep(500);
iState = job.getInternalState(); iState = job.getInternalState();
} }
System.out.println("Task Internal State is : " + iState); LOG.info("Job {} Internal State is : {}", job.getID(), iState);
Assert.assertEquals("Task Internal state is not correct (timedout)", Assert.assertEquals("Task Internal state is not correct (timedout)",
finalState, iState); finalState, iState);
} }
@ -336,17 +336,12 @@ public void waitForInternalState(JobImpl job,
public void waitForInternalState(TaskImpl task, public void waitForInternalState(TaskImpl task,
TaskStateInternal finalState) throws Exception { TaskStateInternal finalState) throws Exception {
int timeoutSecs = 0; int timeoutSecs = 0;
TaskReport report = task.getReport();
TaskStateInternal iState = task.getInternalState(); TaskStateInternal iState = task.getInternalState();
while (!finalState.equals(iState) && timeoutSecs++ < 20) { while (!finalState.equals(iState) && timeoutSecs++ < WAIT_FOR_STATE_CNT) {
System.out.println("Task Internal State is : " + iState Thread.sleep(WAIT_FOR_STATE_INTERVAL);
+ " Waiting for Internal state : " + finalState + " progress : "
+ report.getProgress());
Thread.sleep(500);
report = task.getReport();
iState = task.getInternalState(); iState = task.getInternalState();
} }
System.out.println("Task Internal State is : " + iState); LOG.info("Task {} Internal State is : {}", task.getID(), iState);
Assert.assertEquals("Task Internal state is not correct (timedout)", Assert.assertEquals("Task Internal state is not correct (timedout)",
finalState, iState); finalState, iState);
} }
@ -354,17 +349,12 @@ public void waitForInternalState(TaskImpl task,
public void waitForInternalState(TaskAttemptImpl attempt, public void waitForInternalState(TaskAttemptImpl attempt,
TaskAttemptStateInternal finalState) throws Exception { TaskAttemptStateInternal finalState) throws Exception {
int timeoutSecs = 0; int timeoutSecs = 0;
TaskAttemptReport report = attempt.getReport();
TaskAttemptStateInternal iState = attempt.getInternalState(); TaskAttemptStateInternal iState = attempt.getInternalState();
while (!finalState.equals(iState) && timeoutSecs++ < 20) { while (!finalState.equals(iState) && timeoutSecs++ < WAIT_FOR_STATE_CNT) {
System.out.println("TaskAttempt Internal State is : " + iState Thread.sleep(WAIT_FOR_STATE_INTERVAL);
+ " Waiting for Internal state : " + finalState + " progress : "
+ report.getProgress());
Thread.sleep(500);
report = attempt.getReport();
iState = attempt.getInternalState(); iState = attempt.getInternalState();
} }
System.out.println("TaskAttempt Internal State is : " + iState); LOG.info("TaskAttempt {} Internal State is : {}", attempt.getID(), iState);
Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)", Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)",
finalState, iState); finalState, iState);
} }
@ -374,17 +364,12 @@ public void waitForState(TaskAttempt attempt,
int timeoutSecs = 0; int timeoutSecs = 0;
TaskAttemptReport report = attempt.getReport(); TaskAttemptReport report = attempt.getReport();
while (!finalState.equals(report.getTaskAttemptState()) && while (!finalState.equals(report.getTaskAttemptState()) &&
timeoutSecs++ < 20) { timeoutSecs++ < WAIT_FOR_STATE_CNT) {
System.out.println( Thread.sleep(WAIT_FOR_STATE_INTERVAL);
"TaskAttempt " + attempt.getID().toString() + " State is : "
+ report.getTaskAttemptState()
+ " Waiting for state : " + finalState
+ " progress : " + report.getProgress());
report = attempt.getReport(); report = attempt.getReport();
Thread.sleep(500);
} }
System.out.println("TaskAttempt State is : " LOG.info("TaskAttempt {} State is : {}", attempt.getID(),
+ report.getTaskAttemptState()); report.getTaskAttemptState());
Assert.assertEquals("TaskAttempt state is not correct (timedout)", Assert.assertEquals("TaskAttempt state is not correct (timedout)",
finalState, finalState,
report.getTaskAttemptState()); report.getTaskAttemptState());
@ -418,14 +403,11 @@ public void waitForState(Task task, TaskState finalState) throws Exception {
int timeoutSecs = 0; int timeoutSecs = 0;
TaskReport report = task.getReport(); TaskReport report = task.getReport();
while (!finalState.equals(report.getTaskState()) && while (!finalState.equals(report.getTaskState()) &&
timeoutSecs++ < 20) { timeoutSecs++ < WAIT_FOR_STATE_CNT) {
System.out.println("Task State for " + task.getID() + " is : " Thread.sleep(WAIT_FOR_STATE_INTERVAL);
+ report.getTaskState() + " Waiting for state : " + finalState
+ " progress : " + report.getProgress());
report = task.getReport(); report = task.getReport();
Thread.sleep(500);
} }
System.out.println("Task State is : " + report.getTaskState()); LOG.info("Task {} State is : {}", task.getID(), report.getTaskState());
Assert.assertEquals("Task state is not correct (timedout)", finalState, Assert.assertEquals("Task state is not correct (timedout)", finalState,
report.getTaskState()); report.getTaskState());
} }
@ -434,15 +416,11 @@ public void waitForState(Job job, JobState finalState) throws Exception {
int timeoutSecs = 0; int timeoutSecs = 0;
JobReport report = job.getReport(); JobReport report = job.getReport();
while (!finalState.equals(report.getJobState()) && while (!finalState.equals(report.getJobState()) &&
timeoutSecs++ < 20) { timeoutSecs++ < WAIT_FOR_STATE_CNT) {
System.out.println("Job State is : " + report.getJobState() +
" Waiting for state : " + finalState +
" map progress : " + report.getMapProgress() +
" reduce progress : " + report.getReduceProgress());
report = job.getReport(); report = job.getReport();
Thread.sleep(500); Thread.sleep(WAIT_FOR_STATE_INTERVAL);
} }
System.out.println("Job State is : " + report.getJobState()); LOG.info("Job {} State is : {}", job.getID(), report.getJobState());
Assert.assertEquals("Job state is not correct (timedout)", finalState, Assert.assertEquals("Job state is not correct (timedout)", finalState,
job.getState()); job.getState());
} }
@ -453,12 +431,11 @@ public void waitForState(Service.STATE finalState) throws Exception {
waitForServiceToStop(20 * 1000)); waitForServiceToStop(20 * 1000));
} else { } else {
int timeoutSecs = 0; int timeoutSecs = 0;
while (!finalState.equals(getServiceState()) && timeoutSecs++ < 20) { while (!finalState.equals(getServiceState())
System.out.println("MRApp State is : " + getServiceState() && timeoutSecs++ < WAIT_FOR_STATE_CNT) {
+ " Waiting for state : " + finalState); Thread.sleep(WAIT_FOR_STATE_INTERVAL);
Thread.sleep(500);
} }
System.out.println("MRApp State is : " + getServiceState()); LOG.info("MRApp State is : {}", getServiceState());
Assert.assertEquals("MRApp state is not correct (timedout)", finalState, Assert.assertEquals("MRApp state is not correct (timedout)", finalState,
getServiceState()); getServiceState());
} }
@ -467,16 +444,18 @@ public void waitForState(Service.STATE finalState) throws Exception {
public void verifyCompleted() { public void verifyCompleted() {
for (Job job : getContext().getAllJobs().values()) { for (Job job : getContext().getAllJobs().values()) {
JobReport jobReport = job.getReport(); JobReport jobReport = job.getReport();
System.out.println("Job start time :" + jobReport.getStartTime()); LOG.info("Job start time :{}", jobReport.getStartTime());
System.out.println("Job finish time :" + jobReport.getFinishTime()); LOG.info("Job finish time :", jobReport.getFinishTime());
Assert.assertTrue("Job start time is not less than finish time", Assert.assertTrue("Job start time is not less than finish time",
jobReport.getStartTime() <= jobReport.getFinishTime()); jobReport.getStartTime() <= jobReport.getFinishTime());
Assert.assertTrue("Job finish time is in future", Assert.assertTrue("Job finish time is in future",
jobReport.getFinishTime() <= System.currentTimeMillis()); jobReport.getFinishTime() <= System.currentTimeMillis());
for (Task task : job.getTasks().values()) { for (Task task : job.getTasks().values()) {
TaskReport taskReport = task.getReport(); TaskReport taskReport = task.getReport();
System.out.println("Task start time : " + taskReport.getStartTime()); LOG.info("Task {} start time : {}", task.getID(),
System.out.println("Task finish time : " + taskReport.getFinishTime()); taskReport.getStartTime());
LOG.info("Task {} finish time : {}", task.getID(),
taskReport.getFinishTime());
Assert.assertTrue("Task start time is not less than finish time", Assert.assertTrue("Task start time is not less than finish time",
taskReport.getStartTime() <= taskReport.getFinishTime()); taskReport.getStartTime() <= taskReport.getFinishTime());
for (TaskAttempt attempt : task.getAttempts().values()) { for (TaskAttempt attempt : task.getAttempts().values()) {