YARN-9248. RMContainerImpl:Invalid event: ACQUIRED at KILLED. Contributed by lujie.
This commit is contained in:
parent
6c96f5e4b6
commit
8c30114b00
@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
@ -156,7 +157,8 @@ public class RMContainerImpl implements RMContainer {
|
|||||||
// Transitions from KILLED state
|
// Transitions from KILLED state
|
||||||
.addTransition(RMContainerState.KILLED, RMContainerState.KILLED,
|
.addTransition(RMContainerState.KILLED, RMContainerState.KILLED,
|
||||||
EnumSet.of(RMContainerEventType.EXPIRE, RMContainerEventType.RELEASED,
|
EnumSet.of(RMContainerEventType.EXPIRE, RMContainerEventType.RELEASED,
|
||||||
RMContainerEventType.KILL, RMContainerEventType.FINISHED))
|
RMContainerEventType.KILL, RMContainerEventType.ACQUIRED,
|
||||||
|
RMContainerEventType.FINISHED))
|
||||||
|
|
||||||
// create the topology tables
|
// create the topology tables
|
||||||
.installTopology();
|
.installTopology();
|
||||||
@ -475,8 +477,7 @@ public class RMContainerImpl implements RMContainer {
|
|||||||
stateMachine.doTransition(event.getType(), event);
|
stateMachine.doTransition(event.getType(), event);
|
||||||
} catch (InvalidStateTransitionException e) {
|
} catch (InvalidStateTransitionException e) {
|
||||||
LOG.error("Can't handle this event at current state", e);
|
LOG.error("Can't handle this event at current state", e);
|
||||||
LOG.error("Invalid event " + event.getType() +
|
onInvalidStateTransition(event.getType(), oldState);
|
||||||
" on container " + this.getContainerId());
|
|
||||||
}
|
}
|
||||||
if (oldState != getState()) {
|
if (oldState != getState()) {
|
||||||
LOG.info(event.getContainerId() + " Container Transitioned from "
|
LOG.info(event.getContainerId() + " Container Transitioned from "
|
||||||
@ -915,4 +916,17 @@ public class RMContainerImpl implements RMContainer {
|
|||||||
rmContext.getRMApplicationHistoryWriter().containerStarted(this);
|
rmContext.getRMApplicationHistoryWriter().containerStarted(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* catch the InvalidStateTransition.
|
||||||
|
* @param state
|
||||||
|
* @param rmContainerEventType
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
protected void onInvalidStateTransition(
|
||||||
|
RMContainerEventType rmContainerEventType,
|
||||||
|
RMContainerState state){
|
||||||
|
LOG.error("Invalid event " + rmContainerEventType +
|
||||||
|
" on container " + this.getContainerId());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -562,4 +562,66 @@ public class TestRMContainerImpl {
|
|||||||
AllocationTags.createSingleAppAllocationTags(appId, null),
|
AllocationTags.createSingleAppAllocationTags(appId, null),
|
||||||
Long::max));
|
Long::max));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testContainerAcquiredAtKilled() {
|
||||||
|
DrainDispatcher drainDispatcher = new DrainDispatcher();
|
||||||
|
EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock(
|
||||||
|
EventHandler.class);
|
||||||
|
EventHandler generic = mock(EventHandler.class);
|
||||||
|
drainDispatcher.register(RMAppAttemptEventType.class,
|
||||||
|
appAttemptEventHandler);
|
||||||
|
drainDispatcher.register(RMNodeEventType.class, generic);
|
||||||
|
drainDispatcher.init(new YarnConfiguration());
|
||||||
|
drainDispatcher.start();
|
||||||
|
NodeId nodeId = BuilderUtils.newNodeId("host", 3425);
|
||||||
|
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
|
||||||
|
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||||
|
appId, 1);
|
||||||
|
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
|
||||||
|
ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
|
||||||
|
|
||||||
|
Resource resource = BuilderUtils.newResource(512, 1);
|
||||||
|
Priority priority = BuilderUtils.newPriority(5);
|
||||||
|
|
||||||
|
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||||
|
"host:3465", resource, priority, null);
|
||||||
|
|
||||||
|
ConcurrentMap<ApplicationId, RMApp> appMap = new ConcurrentHashMap<>();
|
||||||
|
RMApp rmApp = mock(RMApp.class);
|
||||||
|
appMap.putIfAbsent(appId, rmApp);
|
||||||
|
|
||||||
|
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
|
||||||
|
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
|
||||||
|
RMContext rmContext = mock(RMContext.class);
|
||||||
|
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
|
||||||
|
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
|
||||||
|
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
|
||||||
|
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
|
||||||
|
AllocationTagsManager ptm = mock(AllocationTagsManager.class);
|
||||||
|
when(rmContext.getAllocationTagsManager()).thenReturn(ptm);
|
||||||
|
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(
|
||||||
|
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
|
||||||
|
true);
|
||||||
|
when(rmContext.getYarnConfiguration()).thenReturn(conf);
|
||||||
|
when(rmContext.getRMApps()).thenReturn(appMap);
|
||||||
|
|
||||||
|
RMContainer rmContainer = new RMContainerImpl(container,
|
||||||
|
SchedulerRequestKey.extractFrom(container), appAttemptId,
|
||||||
|
nodeId, "user", rmContext) {
|
||||||
|
@Override
|
||||||
|
protected void onInvalidStateTransition(
|
||||||
|
RMContainerEventType rmContainerEventType, RMContainerState state) {
|
||||||
|
Assert.fail("RMContainerImpl: can't handle " + rmContainerEventType
|
||||||
|
+ " at state " + state);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rmContainer.handle(new RMContainerEvent(containerId,
|
||||||
|
RMContainerEventType.KILL));
|
||||||
|
|
||||||
|
rmContainer.handle(new RMContainerEvent(containerId,
|
||||||
|
RMContainerEventType.ACQUIRED));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user