YARN-7565. Yarn service pre-maturely releases the container after AM restart. Contributed by Chandni Singh
This commit is contained in:
parent
06f0eb2dce
commit
3ebe6a7819
@ -65,6 +65,7 @@
|
||||
import org.apache.hadoop.yarn.service.component.Component;
|
||||
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
||||
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
||||
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
|
||||
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
|
||||
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
||||
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
|
||||
@ -137,6 +138,9 @@ public class ServiceScheduler extends CompositeService {
|
||||
private YarnRegistryViewForProviders yarnRegistryOperations;
|
||||
private ServiceContext context;
|
||||
private ContainerLaunchService containerLaunchService;
|
||||
private final Map<ContainerId, ComponentInstance> unRecoveredInstances =
|
||||
new ConcurrentHashMap<>();
|
||||
private long containerRecoveryTimeout;
|
||||
|
||||
public ServiceScheduler(ServiceContext context) {
|
||||
super(context.service.getName());
|
||||
@ -212,6 +216,9 @@ public void buildInstance(ServiceContext context, Configuration configuration)
|
||||
createConfigFileCache(context.fs.getFileSystem());
|
||||
|
||||
createAllComponents();
|
||||
containerRecoveryTimeout = getConfig().getInt(
|
||||
YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS,
|
||||
YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
protected YarnRegistryViewForProviders createYarnRegistryOperations(
|
||||
@ -320,7 +327,7 @@ private void recoverComponents(RegisterApplicationMasterResponse response) {
|
||||
}
|
||||
for (Container container : containersFromPrevAttempt) {
|
||||
LOG.info("Handling {} from previous attempt", container.getId());
|
||||
ServiceRecord record = existingRecords.get(RegistryPathUtils
|
||||
ServiceRecord record = existingRecords.remove(RegistryPathUtils
|
||||
.encodeYarnID(container.getId().toString()));
|
||||
if (record != null) {
|
||||
Component comp = componentsById.get(container.getAllocationRequestId());
|
||||
@ -337,6 +344,40 @@ private void recoverComponents(RegisterApplicationMasterResponse response) {
|
||||
amRMClient.releaseAssignedContainer(container.getId());
|
||||
}
|
||||
}
|
||||
|
||||
existingRecords.forEach((encodedContainerId, record) -> {
|
||||
String componentName = record.get(YarnRegistryAttributes.YARN_COMPONENT);
|
||||
if (componentName != null) {
|
||||
Component component = componentsByName.get(componentName);
|
||||
ComponentInstance compInstance = component.getComponentInstance(
|
||||
record.description);
|
||||
ContainerId containerId = ContainerId.fromString(record.get(
|
||||
YarnRegistryAttributes.YARN_ID));
|
||||
unRecoveredInstances.put(containerId, compInstance);
|
||||
component.removePendingInstance(compInstance);
|
||||
}
|
||||
});
|
||||
|
||||
if (unRecoveredInstances.size() > 0) {
|
||||
executorService.schedule(() -> {
|
||||
synchronized (unRecoveredInstances) {
|
||||
// after containerRecoveryTimeout, all the containers that haven't be
|
||||
// recovered by the RM will released. The corresponding Component
|
||||
// Instances are added to the pending queues of their respective
|
||||
// component.
|
||||
unRecoveredInstances.forEach((containerId, instance) -> {
|
||||
LOG.info("{}, wait on container {} expired",
|
||||
instance.getCompInstanceId(), containerId);
|
||||
instance.cleanupRegistryAndCompHdfsDir(containerId);
|
||||
Component component = componentsByName.get(instance.getCompName());
|
||||
component.requestContainers(1);
|
||||
component.reInsertPendingInstance(instance);
|
||||
amRMClient.releaseAssignedContainer(containerId);
|
||||
});
|
||||
unRecoveredInstances.clear();
|
||||
}
|
||||
}, containerRecoveryTimeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private void initGlobalTokensForSubstitute(ServiceContext context) {
|
||||
@ -521,6 +562,35 @@ public void onContainersAllocated(List<Container> containers) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onContainersReceivedFromPreviousAttempts(
|
||||
List<Container> containers) {
|
||||
if (containers == null || containers.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (Container container : containers) {
|
||||
ComponentInstance compInstance;
|
||||
synchronized (unRecoveredInstances) {
|
||||
compInstance = unRecoveredInstances.remove(container.getId());
|
||||
}
|
||||
if (compInstance != null) {
|
||||
Component component = componentsById.get(
|
||||
container.getAllocationRequestId());
|
||||
ComponentEvent event = new ComponentEvent(component.getName(),
|
||||
CONTAINER_RECOVERED)
|
||||
.setInstance(compInstance)
|
||||
.setContainerId(container.getId())
|
||||
.setContainer(container);
|
||||
component.handle(event);
|
||||
} else {
|
||||
LOG.info("Not waiting to recover container {}, releasing",
|
||||
container.getId());
|
||||
amRMClient.releaseAssignedContainer(container.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainersCompleted(List<ContainerStatus> statuses) {
|
||||
for (ContainerStatus status : statuses) {
|
||||
|
@ -107,6 +107,10 @@ FLEX, new FlexComponentTransition())
|
||||
.addTransition(INIT, INIT, CONTAINER_RECOVERED,
|
||||
new ContainerRecoveredTransition())
|
||||
|
||||
// container recovered in AM heartbeat
|
||||
.addTransition(FLEXING, FLEXING, CONTAINER_RECOVERED,
|
||||
new ContainerRecoveredTransition())
|
||||
|
||||
// container allocated by RM
|
||||
.addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED,
|
||||
new ContainerAllocatedTransition())
|
||||
@ -309,6 +313,10 @@ public void transition(Component component, ComponentEvent event) {
|
||||
}
|
||||
}
|
||||
|
||||
public void removePendingInstance(ComponentInstance instance) {
|
||||
pendingInstances.remove(instance);
|
||||
}
|
||||
|
||||
public void reInsertPendingInstance(ComponentInstance instance) {
|
||||
pendingInstances.add(instance);
|
||||
}
|
||||
|
@ -39,15 +39,15 @@
|
||||
import org.apache.hadoop.yarn.service.ServiceScheduler;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.service.component.Component;
|
||||
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
|
||||
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
||||
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
|
||||
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
|
||||
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
|
||||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||
import org.apache.hadoop.yarn.state.StateMachine;
|
||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||
import org.apache.hadoop.yarn.util.BoundedAppender;
|
||||
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
|
||||
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
|
||||
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
|
||||
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -62,7 +62,6 @@
|
||||
|
||||
import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*;
|
||||
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER;
|
||||
import static org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE;
|
||||
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
|
||||
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*;
|
||||
|
||||
@ -398,6 +397,7 @@ private void updateServiceRecord(
|
||||
record.set(YARN_PERSISTENCE, PersistencePolicies.CONTAINER);
|
||||
record.set(YARN_IP, status.getIPs().get(0));
|
||||
record.set(YARN_HOSTNAME, status.getHost());
|
||||
record.set(YARN_COMPONENT, component.getName());
|
||||
try {
|
||||
yarnRegistry
|
||||
.putComponent(RegistryPathUtils.encodeYarnID(containerId), record);
|
||||
|
@ -22,6 +22,8 @@
|
||||
|
||||
public class YarnServiceConf {
|
||||
|
||||
private static final String YARN_SERVICE_PREFIX = "yarn.service.";
|
||||
|
||||
// Retry settings for the ServiceClient to talk to Service AppMaster
|
||||
public static final String CLIENT_AM_RETRY_MAX_WAIT_MS = "yarn.service.client-am.retry.max-wait-ms";
|
||||
public static final String CLIENT_AM_RETRY_MAX_INTERVAL_MS = "yarn.service.client-am.retry-interval-ms";
|
||||
@ -83,6 +85,14 @@ public class YarnServiceConf {
|
||||
*/
|
||||
public static final String JVM_OPTS = "yarn.service.am.java.opts";
|
||||
|
||||
/**
|
||||
* How long to wait until a container is considered dead.
|
||||
*/
|
||||
public static final String CONTAINER_RECOVERY_TIMEOUT_MS =
|
||||
YARN_SERVICE_PREFIX + "container-recovery.timeout.ms";
|
||||
|
||||
public static final int DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS = 120000;
|
||||
|
||||
/**
|
||||
* Get long value for the property. First get from the userConf, if not
|
||||
* present, get from systemConf.
|
||||
|
@ -19,8 +19,13 @@
|
||||
package org.apache.hadoop.yarn.service;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.registry.client.api.RegistryOperations;
|
||||
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
|
||||
import org.apache.hadoop.registry.client.types.ServiceRecord;
|
||||
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
|
||||
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
@ -42,15 +47,24 @@
|
||||
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
||||
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class MockServiceAM extends ServiceMaster {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(MockServiceAM.class);
|
||||
|
||||
Service service;
|
||||
// The list of containers fed by tests to be returned on
|
||||
// AMRMClientCallBackHandler#onContainersAllocated
|
||||
@ -59,6 +73,16 @@ public class MockServiceAM extends ServiceMaster {
|
||||
|
||||
final List<ContainerStatus> failedContainers =
|
||||
Collections.synchronizedList(new LinkedList<>());
|
||||
|
||||
private final List<Container> recoveredContainers =
|
||||
Collections.synchronizedList(new LinkedList<>());
|
||||
|
||||
private final Map<String, ServiceRecord> registryComponents =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
private Map<ContainerId, ContainerStatus> containerStatuses =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
public MockServiceAM(Service service) {
|
||||
super(service.getName());
|
||||
this.service = service;
|
||||
@ -75,7 +99,7 @@ protected ContainerId getAMContainerId()
|
||||
@Override
|
||||
protected Path getAppDir() {
|
||||
Path path = new Path(new Path("target", "apps"), service.getName());
|
||||
System.out.println("Service path: " + path);
|
||||
LOG.info("Service path: {}", path);
|
||||
return path;
|
||||
}
|
||||
|
||||
@ -84,10 +108,24 @@ protected ServiceScheduler createServiceScheduler(ServiceContext context)
|
||||
throws IOException, YarnException {
|
||||
return new ServiceScheduler(context) {
|
||||
|
||||
@SuppressWarnings("SuspiciousMethodCalls")
|
||||
@Override
|
||||
protected YarnRegistryViewForProviders createYarnRegistryOperations(
|
||||
ServiceContext context, RegistryOperations registryClient) {
|
||||
return mock(YarnRegistryViewForProviders.class);
|
||||
YarnRegistryViewForProviders yarnRegistryView = mock(
|
||||
YarnRegistryViewForProviders.class);
|
||||
if (!registryComponents.isEmpty()) {
|
||||
try {
|
||||
when(yarnRegistryView.listComponents())
|
||||
.thenReturn(new LinkedList<>(registryComponents.keySet()));
|
||||
when(yarnRegistryView.getComponent(anyString())).thenAnswer(
|
||||
invocation ->
|
||||
registryComponents.get(invocation.getArguments()[0]));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return yarnRegistryView;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -101,7 +139,7 @@ protected AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClient() {
|
||||
// add new containers if any
|
||||
synchronized (feedContainers) {
|
||||
if (feedContainers.isEmpty()) {
|
||||
System.out.println("Allocating........ no containers");
|
||||
LOG.info("Allocating........ no containers");
|
||||
} else {
|
||||
// The AMRMClient will return containers for compoenent that are
|
||||
// at FLEXING state
|
||||
@ -112,7 +150,7 @@ protected AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClient() {
|
||||
org.apache.hadoop.yarn.service.component.Component component =
|
||||
componentsById.get(c.getAllocationRequestId());
|
||||
if (component.getState() == ComponentState.FLEXING) {
|
||||
System.out.println("Allocated container " + c.getId());
|
||||
LOG.info("Allocated container {} ", c.getId());
|
||||
allocatedContainers.add(c);
|
||||
itor.remove();
|
||||
}
|
||||
@ -121,6 +159,17 @@ protected AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClient() {
|
||||
}
|
||||
}
|
||||
|
||||
// add recovered containers if any
|
||||
synchronized (recoveredContainers) {
|
||||
if (!recoveredContainers.isEmpty()) {
|
||||
List<Container> containersFromPrevAttempt = new LinkedList<>();
|
||||
containersFromPrevAttempt.addAll(recoveredContainers);
|
||||
recoveredContainers.clear();
|
||||
builder.containersFromPreviousAttempt(
|
||||
containersFromPrevAttempt);
|
||||
}
|
||||
}
|
||||
|
||||
// add failed containers if any
|
||||
synchronized (failedContainers) {
|
||||
if (!failedContainers.isEmpty()) {
|
||||
@ -146,15 +195,23 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
}
|
||||
};
|
||||
|
||||
return AMRMClientAsync
|
||||
.createAMRMClientAsync(client1, 1000,
|
||||
return AMRMClientAsync.createAMRMClientAsync(client1, 1000,
|
||||
this.new AMRMClientCallback());
|
||||
}
|
||||
|
||||
@SuppressWarnings("SuspiciousMethodCalls")
|
||||
@Override
|
||||
public NMClientAsync createNMClient() {
|
||||
NMClientAsync nmClientAsync = super.createNMClient();
|
||||
nmClientAsync.setClient(mock(NMClient.class));
|
||||
NMClient nmClient = mock(NMClient.class);
|
||||
try {
|
||||
when(nmClient.getContainerStatus(anyObject(), anyObject()))
|
||||
.thenAnswer(invocation ->
|
||||
containerStatuses.get(invocation.getArguments()[0]));
|
||||
} catch (YarnException | IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
nmClientAsync.setClient(nmClient);
|
||||
return nmClientAsync;
|
||||
}
|
||||
};
|
||||
@ -165,6 +222,33 @@ public NMClientAsync createNMClient() {
|
||||
context.service = service;
|
||||
}
|
||||
|
||||
public void feedRegistryComponent(ContainerId containerId, String compName,
|
||||
String compInstName) {
|
||||
ServiceRecord record = new ServiceRecord();
|
||||
record.set(YarnRegistryAttributes.YARN_ID, containerId.toString());
|
||||
record.description = compInstName;
|
||||
record.set(YarnRegistryAttributes.YARN_PERSISTENCE,
|
||||
PersistencePolicies.CONTAINER);
|
||||
record.set(YarnRegistryAttributes.YARN_IP, "localhost");
|
||||
record.set(YarnRegistryAttributes.YARN_HOSTNAME, "localhost");
|
||||
record.set(YarnRegistryAttributes.YARN_COMPONENT, compName);
|
||||
registryComponents.put(RegistryPathUtils.encodeYarnID(
|
||||
containerId.toString()), record);
|
||||
}
|
||||
|
||||
/**
|
||||
* Simulates a recovered container that is sent to the AM in the heartbeat
|
||||
* response.
|
||||
*
|
||||
* @param containerId The ID for the container
|
||||
* @param compName The component to which the recovered container is fed.
|
||||
*/
|
||||
public void feedRecoveredContainer(ContainerId containerId, String compName) {
|
||||
Container container = createContainer(containerId, compName);
|
||||
recoveredContainers.add(container);
|
||||
addContainerStatus(container, ContainerState.RUNNING);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param service The service for the component
|
||||
@ -174,20 +258,12 @@ public NMClientAsync createNMClient() {
|
||||
*/
|
||||
public Container feedContainerToComp(Service service, int id,
|
||||
String compName) {
|
||||
ApplicationId applicationId = ApplicationId.fromString(service.getId());
|
||||
ContainerId containerId = ContainerId
|
||||
.newContainerId(ApplicationAttemptId.newInstance(applicationId, 1), id);
|
||||
NodeId nodeId = NodeId.newInstance("localhost", 1234);
|
||||
Container container = Container
|
||||
.newInstance(containerId, nodeId, "localhost",
|
||||
Resource.newInstance(100, 1), Priority.newInstance(0), null);
|
||||
|
||||
long allocateId =
|
||||
context.scheduler.getAllComponents().get(compName).getAllocateId();
|
||||
container.setAllocationRequestId(allocateId);
|
||||
ContainerId containerId = createContainerId(id);
|
||||
Container container = createContainer(containerId, compName);
|
||||
synchronized (feedContainers) {
|
||||
feedContainers.add(container);
|
||||
}
|
||||
addContainerStatus(container, ContainerState.RUNNING);
|
||||
return container;
|
||||
}
|
||||
|
||||
@ -196,13 +272,30 @@ public void feedFailedContainerToComp(Service service, int id, String
|
||||
ApplicationId applicationId = ApplicationId.fromString(service.getId());
|
||||
ContainerId containerId = ContainerId
|
||||
.newContainerId(ApplicationAttemptId.newInstance(applicationId, 1), id);
|
||||
ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
|
||||
containerStatus.setContainerId(containerId);
|
||||
ContainerStatus status = Records.newRecord(ContainerStatus.class);
|
||||
status.setContainerId(containerId);
|
||||
synchronized (failedContainers) {
|
||||
failedContainers.add(containerStatus);
|
||||
failedContainers.add(status);
|
||||
}
|
||||
}
|
||||
|
||||
public ContainerId createContainerId(int id) {
|
||||
ApplicationId applicationId = ApplicationId.fromString(service.getId());
|
||||
return ContainerId.newContainerId(
|
||||
ApplicationAttemptId.newInstance(applicationId, 1), id);
|
||||
}
|
||||
|
||||
private Container createContainer(ContainerId containerId, String compName) {
|
||||
NodeId nodeId = NodeId.newInstance("localhost", 1234);
|
||||
Container container = Container.newInstance(
|
||||
containerId, nodeId, "localhost",
|
||||
Resource.newInstance(100, 1),
|
||||
Priority.newInstance(0), null);
|
||||
long allocateId =
|
||||
context.scheduler.getAllComponents().get(compName).getAllocateId();
|
||||
container.setAllocationRequestId(allocateId);
|
||||
return container;
|
||||
}
|
||||
|
||||
public void flexComponent(String compName, long numberOfContainers)
|
||||
throws IOException {
|
||||
@ -256,4 +349,13 @@ public Boolean get() {
|
||||
}
|
||||
}, 1000, 20000);
|
||||
}
|
||||
|
||||
private void addContainerStatus(Container container, ContainerState state) {
|
||||
ContainerStatus status = ContainerStatus.newInstance(container.getId(),
|
||||
state, "", 0);
|
||||
status.setHost(container.getNodeId().getHost());
|
||||
status.setIPs(Lists.newArrayList(container.getNodeId().getHost()));
|
||||
containerStatuses.put(container.getId(), status);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,15 +20,21 @@
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.curator.test.TestingCluster;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.component.ComponentState;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
|
||||
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
@ -39,6 +45,9 @@
|
||||
|
||||
public class TestServiceAM extends ServiceTestUtils{
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestServiceAM.class);
|
||||
|
||||
private File basedir;
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
TestingCluster zkCluster;
|
||||
@ -54,7 +63,7 @@ public void setup() throws Exception {
|
||||
zkCluster = new TestingCluster(1);
|
||||
zkCluster.start();
|
||||
conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
|
||||
System.out.println("ZK cluster: " + zkCluster.getConnectString());
|
||||
LOG.info("ZK cluster: {}", zkCluster.getConnectString());
|
||||
}
|
||||
|
||||
@After
|
||||
@ -91,7 +100,7 @@ public void testContainerCompleted() throws TimeoutException,
|
||||
am.feedContainerToComp(exampleApp, 1, "compa");
|
||||
am.waitForCompInstanceState(compa0, ComponentInstanceState.STARTED);
|
||||
|
||||
System.out.println("Fail the container 1");
|
||||
LOG.info("Fail the container 1");
|
||||
// fail the container
|
||||
am.feedFailedContainerToComp(exampleApp, 1, "compa");
|
||||
|
||||
@ -106,4 +115,89 @@ public void testContainerCompleted() throws TimeoutException,
|
||||
am.getComponent("compa").getPendingInstances().size());
|
||||
am.stop();
|
||||
}
|
||||
|
||||
// Test to verify that the containers of previous attempt are not prematurely
|
||||
// released. These containers are sent by the RM to the AM in the
|
||||
// heartbeat response.
|
||||
@Test(timeout = 200000)
|
||||
public void testContainersFromPreviousAttemptsWithRMRestart()
|
||||
throws Exception {
|
||||
ApplicationId applicationId = ApplicationId.newInstance(
|
||||
System.currentTimeMillis(), 1);
|
||||
Service exampleApp = new Service();
|
||||
exampleApp.setId(applicationId.toString());
|
||||
exampleApp.setName("testContainersRecovers");
|
||||
String comp1Name = "comp1";
|
||||
String comp1InstName = "comp1-0";
|
||||
|
||||
org.apache.hadoop.yarn.service.api.records.Component compA =
|
||||
createComponent(comp1Name, 1, "sleep");
|
||||
exampleApp.addComponent(compA);
|
||||
|
||||
MockServiceAM am = new MockServiceAM(exampleApp);
|
||||
ContainerId containerId = am.createContainerId(1);
|
||||
am.feedRegistryComponent(containerId, comp1Name, comp1InstName);
|
||||
am.init(conf);
|
||||
am.start();
|
||||
|
||||
ComponentInstance comp10 = am.getCompInstance(comp1Name, comp1InstName);
|
||||
am.feedRecoveredContainer(containerId, comp1Name);
|
||||
am.waitForCompInstanceState(comp10, ComponentInstanceState.STARTED);
|
||||
|
||||
// 0 pending instance
|
||||
Assert.assertEquals(0,
|
||||
am.getComponent(comp1Name).getPendingInstances().size());
|
||||
|
||||
GenericTestUtils.waitFor(() -> am.getCompInstance(comp1Name, comp1InstName)
|
||||
.getContainerStatus() != null, 2000, 200000);
|
||||
|
||||
Assert.assertEquals("container state",
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||
am.getCompInstance(comp1Name, comp1InstName).getContainerStatus()
|
||||
.getState());
|
||||
am.stop();
|
||||
}
|
||||
|
||||
// Test to verify that the containers of previous attempt are released and the
|
||||
// component instance is added to the pending queue when the recovery wait
|
||||
// time interval elapses.
|
||||
@Test(timeout = 200000)
|
||||
public void testContainersReleasedWhenExpired()
|
||||
throws Exception {
|
||||
ApplicationId applicationId = ApplicationId.newInstance(
|
||||
System.currentTimeMillis(), 1);
|
||||
Service exampleApp = new Service();
|
||||
exampleApp.setId(applicationId.toString());
|
||||
exampleApp.setName("testContainersRecovers");
|
||||
String comp1Name = "comp1";
|
||||
String comp1InstName = "comp1-0";
|
||||
|
||||
org.apache.hadoop.yarn.service.api.records.Component compA =
|
||||
createComponent(comp1Name, 1, "sleep");
|
||||
exampleApp.addComponent(compA);
|
||||
|
||||
MockServiceAM am = new MockServiceAM(exampleApp);
|
||||
ContainerId containerId = am.createContainerId(1);
|
||||
am.feedRegistryComponent(containerId, comp1Name, comp1InstName);
|
||||
conf.setLong(YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS, 10);
|
||||
am.init(conf);
|
||||
am.start();
|
||||
Thread.sleep(100);
|
||||
GenericTestUtils.waitFor(() -> am.getComponent(comp1Name).getState().equals(
|
||||
ComponentState.FLEXING), 100, 2000);
|
||||
|
||||
// 1 pending instance
|
||||
Assert.assertEquals(1,
|
||||
am.getComponent(comp1Name).getPendingInstances().size());
|
||||
|
||||
am.feedContainerToComp(exampleApp, 2, comp1Name);
|
||||
|
||||
GenericTestUtils.waitFor(() -> am.getCompInstance(comp1Name, comp1InstName)
|
||||
.getContainerStatus() != null, 2000, 200000);
|
||||
Assert.assertEquals("container state",
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||
am.getCompInstance(comp1Name, comp1InstName).getContainerStatus()
|
||||
.getState());
|
||||
am.stop();
|
||||
}
|
||||
}
|
||||
|
@ -486,6 +486,14 @@ public abstract static class AbstractCallbackHandler
|
||||
* stop() is the recommended action.
|
||||
*/
|
||||
public abstract void onError(Throwable e);
|
||||
|
||||
/**
|
||||
* Called when the ResourceManager responds to a heartbeat with containers
|
||||
* from previous attempt.
|
||||
*/
|
||||
public void onContainersReceivedFromPreviousAttempts(
|
||||
List<Container> containers) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -531,5 +539,7 @@ public interface CallbackHandler {
|
||||
* @param e
|
||||
*/
|
||||
void onError(Throwable e);
|
||||
|
||||
void onContainersReceivedFromPreviousAttempts(List<Container> containers);
|
||||
}
|
||||
}
|
||||
|
@ -358,6 +358,11 @@ public void run() {
|
||||
if (!allocated.isEmpty()) {
|
||||
handler.onContainersAllocated(allocated);
|
||||
}
|
||||
|
||||
if (!response.getContainersFromPreviousAttempts().isEmpty()) {
|
||||
handler.onContainersReceivedFromPreviousAttempts(
|
||||
response.getContainersFromPreviousAttempts());
|
||||
}
|
||||
progress = handler.getProgress();
|
||||
} catch (Throwable ex) {
|
||||
handler.onError(ex);
|
||||
|
@ -38,4 +38,5 @@ private YarnRegistryAttributes() {
|
||||
public static final String YARN_PATH = "yarn:path";
|
||||
public static final String YARN_HOSTNAME = "yarn:hostname";
|
||||
public static final String YARN_IP = "yarn:ip";
|
||||
public static final String YARN_COMPONENT = "yarn:component";
|
||||
}
|
||||
|
@ -125,6 +125,7 @@ Above config make the service AM to be retried at max 10 times.
|
||||
|yarn.service.log.exclude-pattern| The regex expression for excluding log files whose file name matches it when aggregating the logs after the application completes. If the log file name matches both include and exclude pattern, this file will be excluded.
|
||||
|yarn.service.rolling-log.include-pattern| The regex expression for including log files whose file name matches it when aggregating the logs while app is running.
|
||||
|yarn.service.rolling-log.exclude-pattern| The regex expression for excluding log files whose file name matches it when aggregating the logs while app is running. If the log file name matches both include and exclude pattern, this file will be excluded.
|
||||
|yarn.service.container-recovery.timeout.ms| The timeout in milliseconds after which the service AM releases all the containers of previous attempt which are not yet recovered by the RM. By default, it is set to 120000, i.e. 2 minutes.
|
||||
|
||||
## Constant variables for custom service
|
||||
The service framework provides some constant variables for user to configure their services. These variables are either dynamically generated by the system or are static ones such as service name defined by the user.
|
||||
|
Loading…
Reference in New Issue
Block a user