MAPREDUCE-2668. Fixed AuxServices to send a signal on application-finish to all the services. Contributed by Thomas Graves.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1181803 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7d5434c6b3
commit
1c358b0796
@ -1573,6 +1573,9 @@ Release 0.23.0 - Unreleased
|
|||||||
MAPREDUCE-3020. Fixed TaskAttemptImpl to log the correct node-address for
|
MAPREDUCE-3020. Fixed TaskAttemptImpl to log the correct node-address for
|
||||||
a finished Reduce task. (Chackaravarthy via vinodkv)
|
a finished Reduce task. (Chackaravarthy via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-2668. Fixed AuxServices to send a signal on application-finish
|
||||||
|
to all the services. (Thomas Graves via vinodkv)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -223,6 +223,7 @@ public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
|
|||||||
public void stopApp(ApplicationId appId) {
|
public void stopApp(ApplicationId appId) {
|
||||||
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
|
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
|
||||||
secretManager.removeTokenForJob(jobId.toString());
|
secretManager.removeTokenForJob(jobId.toString());
|
||||||
|
userRsrc.remove(jobId.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -42,8 +42,8 @@ public class AuxServices extends AbstractService
|
|||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(AuxServices.class);
|
private static final Log LOG = LogFactory.getLog(AuxServices.class);
|
||||||
|
|
||||||
public final Map<String,AuxiliaryService> serviceMap;
|
protected final Map<String,AuxiliaryService> serviceMap;
|
||||||
public final Map<String,ByteBuffer> serviceMeta;
|
protected final Map<String,ByteBuffer> serviceMeta;
|
||||||
|
|
||||||
public AuxServices() {
|
public AuxServices() {
|
||||||
super(AuxServices.class.getName());
|
super(AuxServices.class.getName());
|
||||||
@ -157,20 +157,24 @@ public void stateChanged(Service service) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handle(AuxServicesEvent event) {
|
public void handle(AuxServicesEvent event) {
|
||||||
LOG.info("Got event " + event.getType() + " for service "
|
LOG.info("Got event " + event.getType() + " for appId "
|
||||||
+ event.getServiceID());
|
+ event.getApplicationID());
|
||||||
AuxiliaryService service = serviceMap.get(event.getServiceID());
|
|
||||||
if (null == service) {
|
|
||||||
// TODO kill all containers waiting on Application
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
switch (event.getType()) {
|
switch (event.getType()) {
|
||||||
case APPLICATION_INIT:
|
case APPLICATION_INIT:
|
||||||
|
LOG.info("Got APPLICATION_INIT for service " + event.getServiceID());
|
||||||
|
AuxiliaryService service = serviceMap.get(event.getServiceID());
|
||||||
|
if (null == service) {
|
||||||
|
LOG.info("service is null");
|
||||||
|
// TODO kill all containers waiting on Application
|
||||||
|
return;
|
||||||
|
}
|
||||||
service.initApp(event.getUser(), event.getApplicationID(),
|
service.initApp(event.getUser(), event.getApplicationID(),
|
||||||
event.getServiceData());
|
event.getServiceData());
|
||||||
break;
|
break;
|
||||||
case APPLICATION_STOP:
|
case APPLICATION_STOP:
|
||||||
service.stopApp(event.getApplicationID());
|
for (AuxiliaryService serv : serviceMap.values()) {
|
||||||
|
serv.stopApp(event.getApplicationID());
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("Unknown type: " + event.getType());
|
throw new RuntimeException("Unknown type: " + event.getType());
|
||||||
|
@ -28,6 +28,8 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
|
||||||
@ -247,6 +249,10 @@ void handleAppFinishWithContainersCleanedup() {
|
|||||||
new ApplicationLocalizationEvent(
|
new ApplicationLocalizationEvent(
|
||||||
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, this));
|
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, this));
|
||||||
|
|
||||||
|
// tell any auxiliary services that the app is done
|
||||||
|
this.dispatcher.getEventHandler().handle(
|
||||||
|
new AuxServicesEvent(AuxServicesEventType.APPLICATION_STOP, appId));
|
||||||
|
|
||||||
// TODO: Trigger the LogsManager
|
// TODO: Trigger the LogsManager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,8 +22,12 @@
|
|||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
@ -39,6 +43,7 @@
|
|||||||
import static org.apache.hadoop.yarn.service.Service.STATE.*;
|
import static org.apache.hadoop.yarn.service.Service.STATE.*;
|
||||||
|
|
||||||
public class TestAuxServices {
|
public class TestAuxServices {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestAuxServices.class);
|
||||||
|
|
||||||
static class LightService extends AbstractService
|
static class LightService extends AbstractService
|
||||||
implements AuxServices.AuxiliaryService {
|
implements AuxServices.AuxiliaryService {
|
||||||
@ -47,6 +52,7 @@ static class LightService extends AbstractService
|
|||||||
private int remaining_init;
|
private int remaining_init;
|
||||||
private int remaining_stop;
|
private int remaining_stop;
|
||||||
private ByteBuffer meta = null;
|
private ByteBuffer meta = null;
|
||||||
|
private ArrayList<Integer> stoppedApps;
|
||||||
|
|
||||||
LightService(String name, char idef, int expected_appId) {
|
LightService(String name, char idef, int expected_appId) {
|
||||||
this(name, idef, expected_appId, null);
|
this(name, idef, expected_appId, null);
|
||||||
@ -56,7 +62,13 @@ static class LightService extends AbstractService
|
|||||||
this.idef = idef;
|
this.idef = idef;
|
||||||
this.expected_appId = expected_appId;
|
this.expected_appId = expected_appId;
|
||||||
this.meta = meta;
|
this.meta = meta;
|
||||||
|
this.stoppedApps = new ArrayList<Integer>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ArrayList<Integer> getAppIdsStopped() {
|
||||||
|
return (ArrayList)this.stoppedApps.clone();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Configuration conf) {
|
public void init(Configuration conf) {
|
||||||
remaining_init = conf.getInt(idef + ".expected.init", 0);
|
remaining_init = conf.getInt(idef + ".expected.init", 0);
|
||||||
@ -77,7 +89,7 @@ public void initApp(String user, ApplicationId appId, ByteBuffer data) {
|
|||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public void stopApp(ApplicationId appId) {
|
public void stopApp(ApplicationId appId) {
|
||||||
assertEquals(expected_appId, appId.getId());
|
stoppedApps.add(appId.getId());
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public ByteBuffer getMeta() {
|
public ByteBuffer getMeta() {
|
||||||
@ -86,11 +98,15 @@ public ByteBuffer getMeta() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static class ServiceA extends LightService {
|
static class ServiceA extends LightService {
|
||||||
public ServiceA() { super("A", 'A', 65, ByteBuffer.wrap("A".getBytes())); }
|
public ServiceA() {
|
||||||
|
super("A", 'A', 65, ByteBuffer.wrap("A".getBytes()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class ServiceB extends LightService {
|
static class ServiceB extends LightService {
|
||||||
public ServiceB() { super("B", 'B', 66, ByteBuffer.wrap("B".getBytes())); }
|
public ServiceB() {
|
||||||
|
super("B", 'B', 66, ByteBuffer.wrap("B".getBytes()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -119,6 +135,14 @@ public void testAuxEventDispatch() {
|
|||||||
appId.setId(66);
|
appId.setId(66);
|
||||||
event = new AuxServicesEvent(
|
event = new AuxServicesEvent(
|
||||||
AuxServicesEventType.APPLICATION_STOP, "user0", appId, "Bsrv", null);
|
AuxServicesEventType.APPLICATION_STOP, "user0", appId, "Bsrv", null);
|
||||||
|
// verify all services got the stop event
|
||||||
|
aux.handle(event);
|
||||||
|
Collection<AuxServices.AuxiliaryService> servs = aux.getServices();
|
||||||
|
for (AuxServices.AuxiliaryService serv: servs) {
|
||||||
|
ArrayList<Integer> appIds = ((LightService)serv).getAppIdsStopped();
|
||||||
|
assertEquals("app not properly stopped", 1, appIds.size());
|
||||||
|
assertTrue("wrong app stopped", appIds.contains((Integer)66));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -166,6 +166,10 @@ public void testAppFinishedOnRunningContainers() {
|
|||||||
refEq(new ApplicationLocalizationEvent(
|
refEq(new ApplicationLocalizationEvent(
|
||||||
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
|
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
|
||||||
|
|
||||||
|
verify(wa.auxBus).handle(
|
||||||
|
refEq(new AuxServicesEvent(
|
||||||
|
AuxServicesEventType.APPLICATION_STOP, wa.appId)));
|
||||||
|
|
||||||
wa.appResourcesCleanedup();
|
wa.appResourcesCleanedup();
|
||||||
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
|
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user