MAPREDUCE-3262. Fixed Container's state-machine in NodeManager to handle a couple of events in failure states correctly. Contributed by Hitesh Shah and Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1195416 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-31 11:29:23 +00:00
parent a75c4cf4e4
commit 21b1e1da49
3 changed files with 156 additions and 4 deletions

View File

@ -1877,6 +1877,10 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2696. Fixed NodeManager to cleanup logs in a thread when logs'
aggregation is not enabled. (Siddharth Seth via vinodkv)
MAPREDUCE-3262. Fixed Container's state-machine in NodeManager to handle
a couple of events in failure states correctly. (Hitesh Shah and Siddharth
Seth via vinodkv)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -158,6 +158,19 @@ ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition())
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
// container not launched so kill is a no-op
.addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.KILL_CONTAINER)
// container cleanup triggers a release of all resources
// regardless of whether they were localized or not
// LocalizedResource handles release event in all states
.addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.RESOURCE_LOCALIZED)
.addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.RESOURCE_FAILED)
// From LOCALIZED State
.addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING,
@ -222,6 +235,9 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
ContainerState.KILLING,
ContainerEventType.RESOURCE_LOCALIZED,
new LocalizedResourceDuringKillTransition())
.addTransition(ContainerState.KILLING,
ContainerState.KILLING,
ContainerEventType.RESOURCE_FAILED)
.addTransition(ContainerState.KILLING, ContainerState.KILLING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
@ -242,8 +258,7 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
// in the container launcher
.addTransition(ContainerState.KILLING,
ContainerState.KILLING,
ContainerEventType.CONTAINER_LAUNCHED,
new ContainerTransition())
ContainerEventType.CONTAINER_LAUNCHED)
// From CONTAINER_CLEANEDUP_AFTER_KILL State.
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,

View File

@ -33,6 +33,7 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
@ -225,6 +226,89 @@ public void testCleanupOnKillRequest() throws Exception {
}
}
@Test
public void testKillOnLocalizationFailed() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(15, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.failLocalizeResources(wc.getLocalResourceCount());
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
wc.killContainer();
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
verifyCleanupCall(wc);
} finally {
if (wc != null) {
wc.finished();
}
}
}
@Test
public void testResourceLocalizedOnLocalizationFailed() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(16, 314159265358979L, 4344, "yak");
wc.initContainer();
int failCount = wc.getLocalResourceCount()/2;
if (failCount == 0) {
failCount = 1;
}
wc.failLocalizeResources(failCount);
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
wc.localizeResourcesFromInvalidState(failCount);
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
verifyCleanupCall(wc);
} finally {
if (wc != null) {
wc.finished();
}
}
}
@Test
public void testResourceFailedOnLocalizationFailed() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(16, 314159265358979L, 4344, "yak");
wc.initContainer();
Iterator<String> lRsrcKeys = wc.localResources.keySet().iterator();
String key1 = lRsrcKeys.next();
String key2 = lRsrcKeys.next();
wc.failLocalizeSpecificResource(key1);
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
wc.failLocalizeSpecificResource(key2);
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
verifyCleanupCall(wc);
} finally {
if (wc != null) {
wc.finished();
}
}
}
@Test
public void testResourceFailedOnKilling() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(16, 314159265358979L, 4344, "yak");
wc.initContainer();
Iterator<String> lRsrcKeys = wc.localResources.keySet().iterator();
String key1 = lRsrcKeys.next();
wc.killContainer();
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
wc.failLocalizeSpecificResource(key1);
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
verifyCleanupCall(wc);
} finally {
if (wc != null) {
wc.finished();
}
}
}
/**
* Verify serviceData correctly sent.
*/
@ -491,11 +575,20 @@ public void initContainer() {
drainDispatcherEvents();
}
public Map<Path, String> localizeResources() throws URISyntaxException {
// Localize resources
// Skip some resources so as to consider them failed
public Map<Path, String> doLocalizeResources(boolean checkLocalizingState,
int skipRsrcCount) throws URISyntaxException {
Path cache = new Path("file:///cache");
Map<Path, String> localPaths = new HashMap<Path, String>();
int counter = 0;
for (Entry<String, LocalResource> rsrc : localResources.entrySet()) {
assertEquals(ContainerState.LOCALIZING, c.getContainerState());
if (counter++ < skipRsrcCount) {
continue;
}
if (checkLocalizingState) {
assertEquals(ContainerState.LOCALIZING, c.getContainerState());
}
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
Path p = new Path(cache, rsrc.getKey());
localPaths.put(p, rsrc.getKey());
@ -506,6 +599,42 @@ public Map<Path, String> localizeResources() throws URISyntaxException {
drainDispatcherEvents();
return localPaths;
}
public Map<Path, String> localizeResources() throws URISyntaxException {
return doLocalizeResources(true, 0);
}
public void localizeResourcesFromInvalidState(int skipRsrcCount)
throws URISyntaxException {
doLocalizeResources(false, skipRsrcCount);
}
public void failLocalizeSpecificResource(String rsrcKey)
throws URISyntaxException {
LocalResource rsrc = localResources.get(rsrcKey);
LocalResourceRequest req = new LocalResourceRequest(rsrc);
Exception e = new Exception("Fake localization error");
c.handle(new ContainerResourceFailedEvent(c.getContainerID(), req, e));
drainDispatcherEvents();
}
// fail to localize some resources
public void failLocalizeResources(int failRsrcCount)
throws URISyntaxException {
int counter = 0;
for (Entry<String, LocalResource> rsrc : localResources.entrySet()) {
if (counter >= failRsrcCount) {
break;
}
++counter;
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
Exception e = new Exception("Fake localization error");
c.handle(new ContainerResourceFailedEvent(c.getContainerID(),
req, e));
}
drainDispatcherEvents();
}
public void launchContainer() {
c.handle(new ContainerEvent(cId, ContainerEventType.CONTAINER_LAUNCHED));
@ -535,5 +664,9 @@ public void containerKilledOnRequest() {
.getExitCode()));
drainDispatcherEvents();
}
public int getLocalResourceCount() {
return localResources.size();
}
}
}