YARN-2921. Fix MockRM/MockAM#waitForState sleep too long. (Tsuyoshi Ozawa via wangda)
This commit is contained in:
parent
93b770f7e7
commit
341a476812
@ -400,6 +400,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
YARN-3629. NodeID is always printed as "null" in node manager initialization log.
|
YARN-3629. NodeID is always printed as "null" in node manager initialization log.
|
||||||
(nijel via devaraj)
|
(nijel via devaraj)
|
||||||
|
|
||||||
|
YARN-2921. Fix MockRM/MockAM#waitForState sleep too long.
|
||||||
|
(Tsuyoshi Ozawa via wangda)
|
||||||
|
|
||||||
Release 2.7.1 - UNRELEASED
|
Release 2.7.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -141,7 +141,8 @@ protected void serviceStop() throws Exception {
|
|||||||
synchronized (waitForDrained) {
|
synchronized (waitForDrained) {
|
||||||
while (!drained && eventHandlingThread.isAlive()) {
|
while (!drained && eventHandlingThread.isAlive()) {
|
||||||
waitForDrained.wait(1000);
|
waitForDrained.wait(1000);
|
||||||
LOG.info("Waiting for AsyncDispatcher to drain.");
|
LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
|
||||||
|
eventHandlingThread.getState());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -43,10 +43,13 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
public class MockAM {
|
public class MockAM {
|
||||||
|
|
||||||
|
private static final Logger LOG = Logger.getLogger(MockAM.class);
|
||||||
|
|
||||||
private volatile int responseId = 0;
|
private volatile int responseId = 0;
|
||||||
private final ApplicationAttemptId attemptId;
|
private final ApplicationAttemptId attemptId;
|
||||||
private RMContext context;
|
private RMContext context;
|
||||||
@ -73,18 +76,28 @@ public void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol,
|
|||||||
public void waitForState(RMAppAttemptState finalState) throws Exception {
|
public void waitForState(RMAppAttemptState finalState) throws Exception {
|
||||||
RMApp app = context.getRMApps().get(attemptId.getApplicationId());
|
RMApp app = context.getRMApps().get(attemptId.getApplicationId());
|
||||||
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
|
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
|
||||||
int timeoutSecs = 0;
|
final int timeoutMsecs = 40000;
|
||||||
|
final int minWaitMsecs = 1000;
|
||||||
|
final int waitMsPerLoop = 500;
|
||||||
|
int loop = 0;
|
||||||
while (!finalState.equals(attempt.getAppAttemptState())
|
while (!finalState.equals(attempt.getAppAttemptState())
|
||||||
&& timeoutSecs++ < 40) {
|
&& waitMsPerLoop * loop < timeoutMsecs) {
|
||||||
System.out
|
LOG.info("AppAttempt : " + attemptId + " State is : " +
|
||||||
.println("AppAttempt : " + attemptId + " State is : "
|
attempt.getAppAttemptState() + " Waiting for state : " +
|
||||||
+ attempt.getAppAttemptState()
|
finalState);
|
||||||
+ " Waiting for state : " + finalState);
|
Thread.yield();
|
||||||
Thread.sleep(1000);
|
Thread.sleep(waitMsPerLoop);
|
||||||
|
loop++;
|
||||||
|
}
|
||||||
|
int waitedMsecs = waitMsPerLoop * loop;
|
||||||
|
if (minWaitMsecs > waitedMsecs) {
|
||||||
|
Thread.sleep(minWaitMsecs - waitedMsecs);
|
||||||
|
}
|
||||||
|
LOG.info("Attempt State is : " + attempt.getAppAttemptState());
|
||||||
|
if (waitedMsecs >= timeoutMsecs) {
|
||||||
|
Assert.fail("Attempt state is not correct (timedout): expected: "
|
||||||
|
+ finalState + " actual: " + attempt.getAppAttemptState());
|
||||||
}
|
}
|
||||||
System.out.println("AppAttempt State is : " + attempt.getAppAttemptState());
|
|
||||||
Assert.assertEquals("AppAttempt state is not correct (timedout)",
|
|
||||||
finalState, attempt.getAppAttemptState());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegisterApplicationMasterResponse registerAppAttempt()
|
public RegisterApplicationMasterResponse registerAppAttempt()
|
||||||
|
@ -65,6 +65,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
@ -95,6 +96,7 @@
|
|||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public class MockRM extends ResourceManager {
|
public class MockRM extends ResourceManager {
|
||||||
|
|
||||||
|
static final Logger LOG = Logger.getLogger(MockRM.class);
|
||||||
static final String ENABLE_WEBAPP = "mockrm.webapp.enabled";
|
static final String ENABLE_WEBAPP = "mockrm.webapp.enabled";
|
||||||
|
|
||||||
public MockRM() {
|
public MockRM() {
|
||||||
@ -126,15 +128,23 @@ public void waitForState(ApplicationId appId, RMAppState finalState)
|
|||||||
throws Exception {
|
throws Exception {
|
||||||
RMApp app = getRMContext().getRMApps().get(appId);
|
RMApp app = getRMContext().getRMApps().get(appId);
|
||||||
Assert.assertNotNull("app shouldn't be null", app);
|
Assert.assertNotNull("app shouldn't be null", app);
|
||||||
int timeoutSecs = 0;
|
final int timeoutMsecs = 80000;
|
||||||
while (!finalState.equals(app.getState()) && timeoutSecs++ < 40) {
|
final int waitMsPerLoop = 500;
|
||||||
System.out.println("App : " + appId + " State is : " + app.getState()
|
int loop = 0;
|
||||||
+ " Waiting for state : " + finalState);
|
while (!finalState.equals(app.getState()) &&
|
||||||
Thread.sleep(2000);
|
((waitMsPerLoop * loop) < timeoutMsecs)) {
|
||||||
|
LOG.info("App : " + appId + " State is : " + app.getState() +
|
||||||
|
" Waiting for state : " + finalState);
|
||||||
|
Thread.yield();
|
||||||
|
Thread.sleep(waitMsPerLoop);
|
||||||
|
loop++;
|
||||||
|
}
|
||||||
|
int waitedMsecs = waitMsPerLoop * loop;
|
||||||
|
LOG.info("App State is : " + app.getState());
|
||||||
|
if (waitedMsecs >= timeoutMsecs) {
|
||||||
|
Assert.fail("App state is not correct (timedout): expected: " +
|
||||||
|
finalState + " actual: " + app.getState());
|
||||||
}
|
}
|
||||||
System.out.println("App State is : " + app.getState());
|
|
||||||
Assert.assertEquals("App state is not correct (timedout)", finalState,
|
|
||||||
app.getState());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void waitForState(ApplicationAttemptId attemptId,
|
public void waitForState(ApplicationAttemptId attemptId,
|
||||||
@ -143,16 +153,27 @@ public void waitForState(ApplicationAttemptId attemptId,
|
|||||||
RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
|
RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
|
||||||
Assert.assertNotNull("app shouldn't be null", app);
|
Assert.assertNotNull("app shouldn't be null", app);
|
||||||
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
|
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
|
||||||
int timeoutSecs = 0;
|
final int timeoutMsecs = 40000;
|
||||||
while (!finalState.equals(attempt.getAppAttemptState()) && timeoutSecs++ < 40) {
|
final int minWaitMsecs = 1000;
|
||||||
System.out.println("AppAttempt : " + attemptId
|
final int waitMsPerLoop = 10;
|
||||||
+ " State is : " + attempt.getAppAttemptState()
|
int loop = 0;
|
||||||
+ " Waiting for state : " + finalState);
|
while (!finalState.equals(attempt.getAppAttemptState())
|
||||||
Thread.sleep(1000);
|
&& waitMsPerLoop * loop < timeoutMsecs) {
|
||||||
|
LOG.info("AppAttempt : " + attemptId + " State is : " +
|
||||||
|
attempt.getAppAttemptState() + " Waiting for state : " + finalState);
|
||||||
|
Thread.yield();
|
||||||
|
Thread.sleep(waitMsPerLoop);
|
||||||
|
loop++;
|
||||||
|
}
|
||||||
|
int waitedMsecs = waitMsPerLoop * loop;
|
||||||
|
if (minWaitMsecs > waitedMsecs) {
|
||||||
|
Thread.sleep(minWaitMsecs - waitedMsecs);
|
||||||
|
}
|
||||||
|
LOG.info("Attempt State is : " + attempt.getAppAttemptState());
|
||||||
|
if (waitedMsecs >= timeoutMsecs) {
|
||||||
|
Assert.fail("Attempt state is not correct (timedout): expected: "
|
||||||
|
+ finalState + " actual: " + attempt.getAppAttemptState());
|
||||||
}
|
}
|
||||||
System.out.println("Attempt State is : " + attempt.getAppAttemptState());
|
|
||||||
Assert.assertEquals("Attempt state is not correct (timedout)", finalState,
|
|
||||||
attempt.getAppAttemptState());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void waitForContainerAllocated(MockNM nm, ContainerId containerId)
|
public void waitForContainerAllocated(MockNM nm, ContainerId containerId)
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -53,6 +54,7 @@
|
|||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
import org.apache.hadoop.service.Service.STATE;
|
import org.apache.hadoop.service.Service.STATE;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||||
@ -586,10 +588,19 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
|
|||||||
.getAppAttemptState());
|
.getAppAttemptState());
|
||||||
Assert.assertEquals(RMAppAttemptState.LAUNCHED,rmApp.getAppAttempts()
|
Assert.assertEquals(RMAppAttemptState.LAUNCHED,rmApp.getAppAttempts()
|
||||||
.get(latestAppAttemptId).getAppAttemptState());
|
.get(latestAppAttemptId).getAppAttemptState());
|
||||||
|
|
||||||
rm3.waitForState(latestAppAttemptId, RMAppAttemptState.FAILED);
|
rm3.waitForState(latestAppAttemptId, RMAppAttemptState.FAILED);
|
||||||
rm3.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
|
rm3.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
Assert.assertEquals(4, rmApp.getAppAttempts().size());
|
final int maxRetry = 10;
|
||||||
|
final RMApp rmAppForCheck = rmApp;
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return new Boolean(rmAppForCheck.getAppAttempts().size() == 4);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
100, maxRetry);
|
||||||
Assert.assertEquals(RMAppAttemptState.FAILED,
|
Assert.assertEquals(RMAppAttemptState.FAILED,
|
||||||
rmApp.getAppAttempts().get(latestAppAttemptId).getAppAttemptState());
|
rmApp.getAppAttempts().get(latestAppAttemptId).getAppAttemptState());
|
||||||
|
|
||||||
|
@ -592,7 +592,7 @@ public void testRMRestartOrFailoverNotCountedForAMFailures()
|
|||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 50000)
|
@Test (timeout = 120000)
|
||||||
public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
@ -612,10 +612,10 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
|||||||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
|
|
||||||
// set window size to a larger number : 20s
|
// set window size to a larger number : 60s
|
||||||
// we will verify the app should be failed if
|
// we will verify the app should be failed if
|
||||||
// two continuous attempts failed in 20s.
|
// two continuous attempts failed in 60s.
|
||||||
RMApp app = rm1.submitApp(200, 20000);
|
RMApp app = rm1.submitApp(200, 60000);
|
||||||
|
|
||||||
MockAM am = MockRM.launchAM(app, rm1, nm1);
|
MockAM am = MockRM.launchAM(app, rm1, nm1);
|
||||||
// Fail current attempt normally
|
// Fail current attempt normally
|
||||||
@ -636,8 +636,8 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
|||||||
rm1.waitForState(app.getApplicationId(), RMAppState.FAILED);
|
rm1.waitForState(app.getApplicationId(), RMAppState.FAILED);
|
||||||
|
|
||||||
ControlledClock clock = new ControlledClock(new SystemClock());
|
ControlledClock clock = new ControlledClock(new SystemClock());
|
||||||
// set window size to 6s
|
// set window size to 10s
|
||||||
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 6000);;
|
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000);;
|
||||||
app1.setSystemClock(clock);
|
app1.setSystemClock(clock);
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
@ -655,8 +655,8 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
|||||||
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
am2.waitForState(RMAppAttemptState.RUNNING);
|
am2.waitForState(RMAppAttemptState.RUNNING);
|
||||||
|
|
||||||
// wait for 6 seconds
|
// wait for 10 seconds
|
||||||
clock.setTime(System.currentTimeMillis() + 6*1000);
|
clock.setTime(System.currentTimeMillis() + 10*1000);
|
||||||
// Fail attempt2 normally
|
// Fail attempt2 normally
|
||||||
nm1.nodeHeartbeat(am2.getApplicationAttemptId(),
|
nm1.nodeHeartbeat(am2.getApplicationAttemptId(),
|
||||||
1, ContainerState.COMPLETE);
|
1, ContainerState.COMPLETE);
|
||||||
@ -693,8 +693,8 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
|||||||
MockAM am4 =
|
MockAM am4 =
|
||||||
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
|
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
|
||||||
|
|
||||||
// wait for 6 seconds
|
// wait for 10 seconds
|
||||||
clock.setTime(System.currentTimeMillis() + 6*1000);
|
clock.setTime(System.currentTimeMillis() + 10*1000);
|
||||||
// Fail attempt4 normally
|
// Fail attempt4 normally
|
||||||
nm1
|
nm1
|
||||||
.nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
.nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
|
Loading…
Reference in New Issue
Block a user