HADOOP-10085. CompositeService should allow adding services while being inited. (Steve Loughran via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1563694 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-02-02 19:51:39 +00:00
parent 94b512bc51
commit 88d8ea9509
3 changed files with 217 additions and 23 deletions

View File

@ -314,6 +314,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10320. Javadoc in InterfaceStability.java lacks final </ul>. HADOOP-10320. Javadoc in InterfaceStability.java lacks final </ul>.
(René Nyffenegger via cnauroth) (René Nyffenegger via cnauroth)
HADOOP-10085. CompositeService should allow adding services while being
inited. (Steve Loughran via kasha)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.service; package org.apache.hadoop.service;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -54,13 +53,13 @@ public CompositeService(String name) {
} }
/** /**
* Get an unmodifiable list of services * Get a cloned list of services
* @return a list of child services at the time of invocation - * @return a list of child services at the time of invocation -
* added services will not be picked up. * added services will not be picked up.
*/ */
public List<Service> getServices() { public List<Service> getServices() {
synchronized (serviceList) { synchronized (serviceList) {
return Collections.unmodifiableList(serviceList); return new ArrayList<Service>(serviceList);
} }
} }

View File

@ -16,26 +16,20 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.util; package org.apache.hadoop.service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service.STATE;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.BreakableService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Before;
import org.junit.Test;
public class TestCompositeService { public class TestCompositeService {
private static final int NUM_OF_SERVICES = 5; private static final int NUM_OF_SERVICES = 5;
@ -156,7 +150,7 @@ public void testServiceStartup() {
try { try {
serviceManager.start(); serviceManager.start();
fail("Exception should have been thrown due to startup failure of last service"); fail("Exception should have been thrown due to startup failure of last service");
} catch (YarnRuntimeException e) { } catch (ServiceTestRuntimeException e) {
for (int i = 0; i < NUM_OF_SERVICES - 1; i++) { for (int i = 0; i < NUM_OF_SERVICES - 1; i++) {
if (i >= FAILED_SERVICE_SEQ_NUMBER && STOP_ONLY_STARTED_SERVICES) { if (i >= FAILED_SERVICE_SEQ_NUMBER && STOP_ONLY_STARTED_SERVICES) {
// Failed service state should be INITED // Failed service state should be INITED
@ -197,7 +191,7 @@ public void testServiceStop() {
// Stop the composite service // Stop the composite service
try { try {
serviceManager.stop(); serviceManager.stop();
} catch (YarnRuntimeException e) { } catch (ServiceTestRuntimeException e) {
} }
assertInState(STATE.STOPPED, services); assertInState(STATE.STOPPED, services);
} }
@ -338,6 +332,40 @@ public void serviceInit(Configuration conf) {
1, testService.getServices().size()); 1, testService.getServices().size());
} }
@Test(timeout = 1000)
public void testAddInitedSiblingInInit() throws Throwable {
CompositeService parent = new CompositeService("parent");
BreakableService sibling = new BreakableService();
sibling.init(new Configuration());
parent.addService(new AddSiblingService(parent,
sibling,
STATE.INITED));
parent.init(new Configuration());
parent.start();
parent.stop();
assertEquals("Incorrect number of services",
2, parent.getServices().size());
}
@Test(timeout = 1000)
public void testAddUninitedSiblingInInit() throws Throwable {
CompositeService parent = new CompositeService("parent");
BreakableService sibling = new BreakableService();
parent.addService(new AddSiblingService(parent,
sibling,
STATE.INITED));
parent.init(new Configuration());
try {
parent.start();
fail("Expected an exception, got " + parent);
} catch (ServiceStateException e) {
//expected
}
parent.stop();
assertEquals("Incorrect number of services",
2, parent.getServices().size());
}
@Test @Test
public void testRemoveService() { public void testRemoveService() {
CompositeService testService = new CompositeService("TestService") { CompositeService testService = new CompositeService("TestService") {
@ -365,6 +393,105 @@ public void serviceInit(Configuration conf) {
2, testService.getServices().size()); 2, testService.getServices().size());
} }
@Test(timeout = 1000)
public void testAddStartedChildBeforeInit() throws Throwable {
CompositeService parent = new CompositeService("parent");
BreakableService child = new BreakableService();
child.init(new Configuration());
child.start();
AddSiblingService.addChildToService(parent, child);
try {
parent.init(new Configuration());
fail("Expected an exception, got " + parent);
} catch (ServiceStateException e) {
//expected
}
parent.stop();
}
@Test(timeout = 1000)
public void testAddStoppedChildBeforeInit() throws Throwable {
CompositeService parent = new CompositeService("parent");
BreakableService child = new BreakableService();
child.init(new Configuration());
child.start();
child.stop();
AddSiblingService.addChildToService(parent, child);
try {
parent.init(new Configuration());
fail("Expected an exception, got " + parent);
} catch (ServiceStateException e) {
//expected
}
parent.stop();
}
@Test(timeout = 1000)
public void testAddStartedSiblingInStart() throws Throwable {
CompositeService parent = new CompositeService("parent");
BreakableService sibling = new BreakableService();
sibling.init(new Configuration());
sibling.start();
parent.addService(new AddSiblingService(parent,
sibling,
STATE.STARTED));
parent.init(new Configuration());
parent.start();
parent.stop();
assertEquals("Incorrect number of services",
2, parent.getServices().size());
}
@Test(timeout = 1000)
public void testAddUninitedSiblingInStart() throws Throwable {
CompositeService parent = new CompositeService("parent");
BreakableService sibling = new BreakableService();
parent.addService(new AddSiblingService(parent,
sibling,
STATE.STARTED));
parent.init(new Configuration());
assertInState(STATE.NOTINITED, sibling);
parent.start();
parent.stop();
assertEquals("Incorrect number of services",
2, parent.getServices().size());
}
@Test(timeout = 1000)
public void testAddStartedSiblingInInit() throws Throwable {
CompositeService parent = new CompositeService("parent");
BreakableService sibling = new BreakableService();
sibling.init(new Configuration());
sibling.start();
parent.addService(new AddSiblingService(parent,
sibling,
STATE.INITED));
parent.init(new Configuration());
assertInState(STATE.STARTED, sibling);
parent.start();
assertInState(STATE.STARTED, sibling);
parent.stop();
assertEquals("Incorrect number of services",
2, parent.getServices().size());
assertInState(STATE.STOPPED, sibling);
}
@Test(timeout = 1000)
public void testAddStartedSiblingInStop() throws Throwable {
CompositeService parent = new CompositeService("parent");
BreakableService sibling = new BreakableService();
sibling.init(new Configuration());
sibling.start();
parent.addService(new AddSiblingService(parent,
sibling,
STATE.STOPPED));
parent.init(new Configuration());
parent.start();
parent.stop();
assertEquals("Incorrect number of services",
2, parent.getServices().size());
}
public static class CompositeServiceAddingAChild extends CompositeService{ public static class CompositeServiceAddingAChild extends CompositeService{
Service child; Service child;
@ -380,6 +507,17 @@ protected void serviceInit(Configuration conf) throws Exception {
} }
} }
public static class ServiceTestRuntimeException extends RuntimeException {
public ServiceTestRuntimeException(String message) {
super(message);
}
}
/**
* This is a composite service that keeps a count of the number of lifecycle
* events called, and can be set to throw a {@link ServiceTestRuntimeException }
* during service start or stop
*/
public static class CompositeServiceImpl extends CompositeService { public static class CompositeServiceImpl extends CompositeService {
public static boolean isPolicyToStopOnlyStartedServices() { public static boolean isPolicyToStopOnlyStartedServices() {
@ -408,7 +546,7 @@ protected void serviceInit(Configuration conf) throws Exception {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
if (throwExceptionOnStart) { if (throwExceptionOnStart) {
throw new YarnRuntimeException("Fake service start exception"); throw new ServiceTestRuntimeException("Fake service start exception");
} }
counter++; counter++;
callSequenceNumber = counter; callSequenceNumber = counter;
@ -420,7 +558,7 @@ protected void serviceStop() throws Exception {
counter++; counter++;
callSequenceNumber = counter; callSequenceNumber = counter;
if (throwExceptionOnStop) { if (throwExceptionOnStop) {
throw new YarnRuntimeException("Fake service stop exception"); throw new ServiceTestRuntimeException("Fake service stop exception");
} }
super.serviceStop(); super.serviceStop();
} }
@ -457,6 +595,9 @@ public String toString() {
} }
/**
* Composite service that makes the addService method public to all
*/
public static class ServiceManager extends CompositeService { public static class ServiceManager extends CompositeService {
public void addTestService(CompositeService service) { public void addTestService(CompositeService service) {
@ -468,4 +609,55 @@ public ServiceManager(String name) {
} }
} }
public static class AddSiblingService extends CompositeService {
private final CompositeService parent;
private final Service serviceToAdd;
private STATE triggerState;
public AddSiblingService(CompositeService parent,
Service serviceToAdd,
STATE triggerState) {
super("ParentStateManipulatorService");
this.parent = parent;
this.serviceToAdd = serviceToAdd;
this.triggerState = triggerState;
}
/**
* Add the serviceToAdd to the parent if this service
* is in the state requested
*/
private void maybeAddSibling() {
if (getServiceState() == triggerState) {
parent.addService(serviceToAdd);
}
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
maybeAddSibling();
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
maybeAddSibling();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
maybeAddSibling();
super.serviceStop();
}
/**
* Expose addService method
* @param parent parent service
* @param child child to add
*/
public static void addChildToService(CompositeService parent, Service child) {
parent.addService(child);
}
}
} }