YARN-7486. Race condition in service AM that can cause NPE. Contributed by Jian He

This commit is contained in:
Billie Rinaldi 2017-11-15 10:20:46 -08:00
parent 462e25a3b2
commit f4d5d20286
13 changed files with 290 additions and 126 deletions

View File

@ -132,7 +132,6 @@ public class ServiceScheduler extends CompositeService {
private AMRMClientAsync<AMRMClient.ContainerRequest> amRMClient;
private NMClientAsync nmClient;
private AsyncDispatcher dispatcher;
AsyncDispatcher compInstanceDispatcher;
private YarnRegistryViewForProviders yarnRegistryOperations;
private ServiceContext context;
private ContainerLaunchService containerLaunchService;
@ -152,7 +151,7 @@ public class ServiceScheduler extends CompositeService {
yarnRegistryOperations =
createYarnRegistryOperations(context, registryClient);
// register metrics
// register metrics,
serviceMetrics = ServiceMetrics
.register(app.getName(), "Metrics for service");
serviceMetrics.tag("type", "Metrics type [component or service]", "service");
@ -167,14 +166,11 @@ public class ServiceScheduler extends CompositeService {
dispatcher = new AsyncDispatcher("Component dispatcher");
dispatcher.register(ComponentEventType.class,
new ComponentEventHandler());
dispatcher.register(ComponentInstanceEventType.class,
new ComponentInstanceEventHandler());
dispatcher.setDrainEventsOnStop();
addIfService(dispatcher);
compInstanceDispatcher =
new AsyncDispatcher("CompInstance dispatcher");
compInstanceDispatcher.register(ComponentInstanceEventType.class,
new ComponentInstanceEventHandler());
addIfService(compInstanceDispatcher);
containerLaunchService = new ContainerLaunchService(context.fs);
addService(containerLaunchService);
@ -277,10 +273,10 @@ public class ServiceScheduler extends CompositeService {
}
private void recoverComponents(RegisterApplicationMasterResponse response) {
List<Container> recoveredContainers = response
List<Container> containersFromPrevAttempt = response
.getContainersFromPreviousAttempts();
LOG.info("Received {} containers from previous attempt.",
recoveredContainers.size());
containersFromPrevAttempt.size());
Map<String, ServiceRecord> existingRecords = new HashMap<>();
List<String> existingComps = null;
try {
@ -302,9 +298,8 @@ public class ServiceScheduler extends CompositeService {
}
}
}
for (Container container : recoveredContainers) {
LOG.info("Handling container {} from previous attempt",
container.getId());
for (Container container : containersFromPrevAttempt) {
LOG.info("Handling {} from previous attempt", container.getId());
ServiceRecord record = existingRecords.get(RegistryPathUtils
.encodeYarnID(container.getId().toString()));
if (record != null) {
@ -487,16 +482,21 @@ public class ServiceScheduler extends CompositeService {
new ComponentEvent(comp.getName(), CONTAINER_ALLOCATED)
.setContainer(container);
dispatcher.getEventHandler().handle(event);
Collection<AMRMClient.ContainerRequest> requests = amRMClient
.getMatchingRequests(container.getAllocationRequestId());
LOG.info("[COMPONENT {}]: {} outstanding container requests.",
comp.getName(), requests.size());
// remove the corresponding request
if (requests.iterator().hasNext()) {
LOG.info("[COMPONENT {}]: removing one container request.", comp
.getName());
AMRMClient.ContainerRequest request = requests.iterator().next();
amRMClient.removeContainerRequest(request);
try {
Collection<AMRMClient.ContainerRequest> requests = amRMClient
.getMatchingRequests(container.getAllocationRequestId());
LOG.info("[COMPONENT {}]: remove {} outstanding container requests " +
"for allocateId " + container.getAllocationRequestId(),
comp.getName(), requests.size());
// remove the corresponding request
if (requests.iterator().hasNext()) {
AMRMClient.ContainerRequest request = requests.iterator().next();
amRMClient.removeContainerRequest(request);
}
} catch(Exception e) {
//TODO Due to YARN-7490, exception may be thrown, catch and ignore for
//now.
LOG.error("Exception when removing the matching requests. ", e);
}
}
}
@ -569,7 +569,7 @@ public class ServiceScheduler extends CompositeService {
}
ComponentEvent event =
new ComponentEvent(instance.getCompName(), CONTAINER_STARTED)
.setInstance(instance);
.setInstance(instance).setContainerId(containerId);
dispatcher.getEventHandler().handle(event);
}
@ -649,10 +649,6 @@ public class ServiceScheduler extends CompositeService {
liveInstances.remove(containerId);
}
public AsyncDispatcher getCompInstanceDispatcher() {
return compInstanceDispatcher;
}
public YarnRegistryViewForProviders getYarnRegistryOperations() {
return yarnRegistryOperations;
}

View File

@ -82,7 +82,8 @@ public class Component implements EventHandler<ComponentEvent> {
private Map<String, ComponentInstance> compInstances =
new ConcurrentHashMap<>();
// component instances to be assigned with a container
private List<ComponentInstance> pendingInstances = new LinkedList<>();
private List<ComponentInstance> pendingInstances =
Collections.synchronizedList(new LinkedList<>());
private ContainerFailureTracker failureTracker;
private Probe probe;
private final ReentrantReadWriteLock.ReadLock readLock;
@ -94,7 +95,7 @@ public class Component implements EventHandler<ComponentEvent> {
private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
stateMachine;
private AsyncDispatcher compInstanceDispatcher;
private AsyncDispatcher dispatcher;
private static final StateMachineFactory<Component, ComponentState, ComponentEventType, ComponentEvent>
stateMachineFactory =
new StateMachineFactory<Component, ComponentState, ComponentEventType, ComponentEvent>(
@ -149,7 +150,7 @@ public class Component implements EventHandler<ComponentEvent> {
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
this.stateMachine = stateMachineFactory.make(this);
compInstanceDispatcher = scheduler.getCompInstanceDispatcher();
dispatcher = scheduler.getDispatcher();
failureTracker =
new ContainerFailureTracker(context, this);
probe = MonitorUtils.getProbe(componentSpec.getReadinessCheck());
@ -256,30 +257,18 @@ public class Component implements EventHandler<ComponentEvent> {
component.releaseContainer(container);
return;
}
if (instance.hasContainer()) {
LOG.info(
"[COMPONENT {}]: Instance {} already has container, release " +
"surplus container {}",
instance.getCompName(), instance.getCompInstanceId(), container
.getId());
component.releaseContainer(container);
return;
}
component.pendingInstances.remove(instance);
LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " +
"host {}, num pending component instances reduced to {} ",
component.getName(), container.getId(), instance
.getCompInstanceName(), container.getNodeId(), component
.pendingInstances.size());
instance.setContainer(container);
ProviderUtils.initCompInstanceDir(component.getContext().fs, instance);
component.getScheduler().addLiveCompInstance(container.getId(), instance);
LOG.info("[COMPONENT {}]: Marking {} as started for component " +
"instance {}", component.getName(), event.getContainer().getId(),
instance.getCompInstanceId());
component.compInstanceDispatcher.getEventHandler().handle(
new ComponentInstanceEvent(instance.getContainerId(),
START));
LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " +
"host {}, num pending component instances reduced to {} ",
component.getName(), container.getId(),
instance.getCompInstanceName(), container.getNodeId(),
component.pendingInstances.size());
component.dispatcher.getEventHandler().handle(
new ComponentInstanceEvent(container.getId(), START));
}
}
@ -288,9 +277,8 @@ public class Component implements EventHandler<ComponentEvent> {
@Override public ComponentState transition(Component component,
ComponentEvent event) {
component.compInstanceDispatcher.getEventHandler().handle(
new ComponentInstanceEvent(event.getInstance().getContainerId(),
START));
component.dispatcher.getEventHandler().handle(
new ComponentInstanceEvent(event.getContainerId(), START));
return checkIfStable(component);
}
}
@ -313,14 +301,7 @@ public class Component implements EventHandler<ComponentEvent> {
@Override
public void transition(Component component, ComponentEvent event) {
component.updateMetrics(event.getStatus());
// add back to pending list
component.pendingInstances.add(event.getInstance());
LOG.info(
"[COMPONENT {}]: {} completed, num pending comp instances increased to {}.",
component.getName(), event.getStatus().getContainerId(),
component.pendingInstances.size());
component.compInstanceDispatcher.getEventHandler().handle(
component.dispatcher.getEventHandler().handle(
new ComponentInstanceEvent(event.getStatus().getContainerId(),
STOP).setStatus(event.getStatus()));
component.componentSpec.setState(
@ -328,8 +309,8 @@ public class Component implements EventHandler<ComponentEvent> {
}
}
public ServiceMetrics getCompMetrics () {
return componentMetrics;
public void reInsertPendingInstance(ComponentInstance instance) {
pendingInstances.add(instance);
}
private void releaseContainer(Container container) {
@ -581,4 +562,9 @@ public class Component implements EventHandler<ComponentEvent> {
public ServiceContext getContext() {
return context;
}
// Only for testing
public List<ComponentInstance> getPendingInstances() {
return pendingInstances;
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.service.component;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
@ -30,6 +31,16 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
private Container container;
private ComponentInstance instance;
private ContainerStatus status;
private ContainerId containerId;
public ContainerId getContainerId() {
return containerId;
}
public ComponentEvent setContainerId(ContainerId containerId) {
this.containerId = containerId;
return this;
}
public ComponentEvent(String name, ComponentEventType type) {
super(type);

View File

@ -146,7 +146,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
compInstance.containerStatusFuture =
compInstance.scheduler.executorService.scheduleAtFixedRate(
new ContainerStatusRetriever(compInstance.scheduler,
compInstance.getContainerId(), compInstance), 0, 1,
event.getContainerId(), compInstance), 0, 1,
TimeUnit.SECONDS);
compInstance.component.incRunningContainers();
long containerStartTime = System.currentTimeMillis();
@ -160,10 +160,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
org.apache.hadoop.yarn.service.api.records.Container container =
new org.apache.hadoop.yarn.service.api.records.Container();
container.setId(compInstance.getContainerId().toString());
container.setId(event.getContainerId().toString());
container.setLaunchTime(new Date(containerStartTime));
container.setState(ContainerState.RUNNING_BUT_UNREADY);
container.setBareHost(compInstance.container.getNodeId().getHost());
container.setBareHost(compInstance.getNodeId().getHost());
container.setComponentInstanceName(compInstance.getCompInstanceName());
if (compInstance.containerSpec != null) {
// remove the previous container.
@ -219,15 +219,11 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// re-ask the failed container.
Component comp = compInstance.component;
comp.requestContainers(1);
LOG.info(compInstance.getCompInstanceId()
+ ": Container completed. Requested a new container." + System
.lineSeparator() + " exitStatus={}, diagnostics={}.",
event.getStatus().getExitStatus(),
event.getStatus().getDiagnostics());
String containerDiag =
compInstance.getCompInstanceId() + ": " + event.getStatus()
.getDiagnostics();
compInstance.diagnostics.append(containerDiag + System.lineSeparator());
compInstance.cancelContainerStatusRetriever();
if (compInstance.getState().equals(READY)) {
compInstance.component.decContainersReady();
@ -255,11 +251,13 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// hdfs dir content will be overwritten when a new container gets started,
// so no need remove.
compInstance.scheduler.executorService
.submit(compInstance::cleanupRegistry);
.submit(() -> compInstance.cleanupRegistry(event.getContainerId()));
if (compInstance.timelineServiceEnabled) {
// record in ATS
compInstance.serviceTimelinePublisher.componentInstanceFinished
(compInstance, event.getStatus().getExitStatus(), containerDiag);
compInstance.serviceTimelinePublisher
.componentInstanceFinished(event.getContainerId(),
event.getStatus().getExitStatus(), containerDiag);
}
compInstance.containerSpec.setState(ContainerState.STOPPED);
}
@ -267,6 +265,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// remove the failed ContainerId -> CompInstance mapping
comp.getScheduler().removeLiveCompInstance(event.getContainerId());
comp.reInsertPendingInstance(compInstance);
LOG.info(compInstance.getCompInstanceId()
+ ": {} completed. Reinsert back to pending list and requested " +
"a new container." + System.lineSeparator() +
" exitStatus={}, diagnostics={}.",
event.getContainerId(), event.getStatus().getExitStatus(),
event.getStatus().getDiagnostics());
if (shouldExit) {
// Sleep for 5 seconds in hope that the state can be recorded in ATS.
// in case there's a client polling the comp state, it can be notified.
@ -277,8 +283,6 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
ExitUtil.terminate(-1);
}
compInstance.removeContainer();
}
}
@ -312,15 +316,6 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
}
public boolean hasContainer() {
return this.container != null;
}
public void removeContainer() {
this.container = null;
this.compInstanceId.setContainerId(null);
}
public void setContainer(Container container) {
this.container = container;
this.compInstanceId.setContainerId(container.getId());
@ -337,7 +332,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
public void updateContainerStatus(ContainerStatus status) {
this.status = status;
org.apache.hadoop.yarn.service.api.records.Container container =
getCompSpec().getContainer(getContainerId().toString());
getCompSpec().getContainer(status.getContainerId().toString());
if (container != null) {
container.setIp(StringUtils.join(",", status.getIPs()));
container.setHostname(status.getHost());
@ -348,10 +343,6 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
updateServiceRecord(yarnRegistryOperations, status);
}
public ContainerId getContainerId() {
return container.getId();
}
public String getCompName() {
return compInstanceId.getCompName();
}
@ -423,12 +414,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
public void destroy() {
LOG.info(getCompInstanceId() + ": Flexed down by user, destroying.");
diagnostics.append(getCompInstanceId() + ": Flexed down by user");
if (container != null) {
scheduler.removeLiveCompInstance(container.getId());
component.getScheduler().getAmRMClient()
.releaseAssignedContainer(container.getId());
getCompSpec().removeContainer(containerSpec);
}
// update metrics
if (getState() == STARTED) {
component.decRunningContainers();
@ -437,16 +423,29 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
component.decContainersReady();
component.decRunningContainers();
}
getCompSpec().removeContainer(containerSpec);
if (container == null) {
LOG.info(getCompInstanceId() + " no container is assigned when " +
"destroying");
return;
}
ContainerId containerId = container.getId();
scheduler.removeLiveCompInstance(containerId);
component.getScheduler().getAmRMClient()
.releaseAssignedContainer(containerId);
if (timelineServiceEnabled) {
serviceTimelinePublisher.componentInstanceFinished(this,
serviceTimelinePublisher.componentInstanceFinished(containerId,
KILLED_BY_APPMASTER, diagnostics.toString());
}
scheduler.executorService.submit(this::cleanupRegistryAndCompHdfsDir);
cancelContainerStatusRetriever();
scheduler.executorService.submit(() ->
cleanupRegistryAndCompHdfsDir(containerId));
}
private void cleanupRegistry() {
ContainerId containerId = getContainerId();
private void cleanupRegistry(ContainerId containerId) {
String cid = RegistryPathUtils.encodeYarnID(containerId.toString());
try {
yarnRegistryOperations.deleteComponent(getCompInstanceId(), cid);
@ -456,8 +455,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
//TODO Maybe have a dedicated cleanup service.
public void cleanupRegistryAndCompHdfsDir() {
cleanupRegistry();
public void cleanupRegistryAndCompHdfsDir(ContainerId containerId) {
cleanupRegistry(containerId);
try {
if (compInstanceDir != null && fs.exists(compInstanceDir)) {
boolean deleted = fs.delete(compInstanceDir, true);
@ -515,6 +514,12 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
}
private void cancelContainerStatusRetriever() {
if (containerStatusFuture != null && !containerStatusFuture.isDone()) {
containerStatusFuture.cancel(true);
}
}
@Override
public int compareTo(ComponentInstance to) {
long delta = containerStartedTime - to.containerStartedTime;

View File

@ -87,7 +87,7 @@ public class ContainerLaunchService extends AbstractService{
AbstractLauncher launcher = new AbstractLauncher(fs, null);
try {
provider.buildContainerLaunchContext(launcher, service,
instance, fs, getConfig());
instance, fs, getConfig(), container);
instance.getComponent().getScheduler().getNmClient()
.startContainerAsync(container,
launcher.completeContainerLaunch());

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.service.provider;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.api.records.Component;
@ -55,7 +56,7 @@ public abstract class AbstractProviderService implements ProviderService,
public void buildContainerLaunchContext(AbstractLauncher launcher,
Service service, ComponentInstance instance,
SliderFileSystem fileSystem, Configuration yarnConf)
SliderFileSystem fileSystem, Configuration yarnConf, Container container)
throws IOException, SliderException {
Component component = instance.getComponent().getComponentSpec();;
processArtifact(launcher, instance, fileSystem, service);
@ -67,7 +68,7 @@ public abstract class AbstractProviderService implements ProviderService,
Map<String, String> globalTokens =
instance.getComponent().getScheduler().globalTokens;
Map<String, String> tokensForSubstitution = ProviderUtils
.initCompTokensForSubstitute(instance);
.initCompTokensForSubstitute(instance, container);
tokensForSubstitution.putAll(globalTokens);
// Set the environment variables in launcher
launcher.putEnv(ServiceUtils

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.service.provider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
@ -34,6 +35,6 @@ public interface ProviderService {
*/
void buildContainerLaunchContext(AbstractLauncher containerLauncher,
Service service, ComponentInstance instance,
SliderFileSystem sliderFileSystem, Configuration yarnConf)
throws IOException, SliderException;
SliderFileSystem sliderFileSystem, Configuration yarnConf, Container
container) throws IOException, SliderException;
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.service.ServiceContext;
@ -393,13 +394,13 @@ public class ProviderUtils implements YarnServiceConstants {
* @return tokens to replace
*/
public static Map<String, String> initCompTokensForSubstitute(
ComponentInstance instance) {
ComponentInstance instance, Container container) {
Map<String, String> tokens = new HashMap<>();
tokens.put(COMPONENT_NAME, instance.getCompSpec().getName());
tokens
.put(COMPONENT_NAME_LC, instance.getCompSpec().getName().toLowerCase());
tokens.put(COMPONENT_INSTANCE_NAME, instance.getCompInstanceName());
tokens.put(CONTAINER_ID, instance.getContainer().getId().toString());
tokens.put(CONTAINER_ID, container.getId().toString());
tokens.put(COMPONENT_ID,
String.valueOf(instance.getCompInstanceId().getId()));
tokens.putAll(instance.getComponent().getDependencyHostIpTokens());

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.service.timelineservice;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@ -178,10 +179,10 @@ public class ServiceTimelinePublisher extends CompositeService {
putEntity(entity);
}
public void componentInstanceFinished(ComponentInstance instance,
public void componentInstanceFinished(ContainerId containerId,
int exitCode, String diagnostics) {
TimelineEntity entity = createComponentInstanceEntity(
instance.getContainer().getId().toString());
containerId.toString());
// create info keys
Map<String, Object> entityInfos = new HashMap<String, Object>();

View File

@ -24,14 +24,8 @@ import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@ -42,15 +36,15 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.util.Records;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import java.util.concurrent.TimeoutException;
import static org.mockito.Mockito.mock;
@ -63,6 +57,8 @@ public class MockServiceAM extends ServiceMaster {
final List<Container> feedContainers =
Collections.synchronizedList(new LinkedList<>());
final List<ContainerStatus> failedContainers =
Collections.synchronizedList(new LinkedList<>());
public MockServiceAM(Service service) {
super(service.getName());
this.service = service;
@ -102,10 +98,10 @@ public class MockServiceAM extends ServiceMaster {
AllocateResponse.AllocateResponseBuilder builder =
AllocateResponse.newBuilder();
// add new containers if any
synchronized (feedContainers) {
if (feedContainers.isEmpty()) {
System.out.println("Allocating........ no containers");
return builder.build();
} else {
// The AMRMClient will return containers for compoenent that are
// at FLEXING state
@ -121,9 +117,20 @@ public class MockServiceAM extends ServiceMaster {
itor.remove();
}
}
return builder.allocatedContainers(allocatedContainers).build();
builder.allocatedContainers(allocatedContainers);
}
}
// add failed containers if any
synchronized (failedContainers) {
if (!failedContainers.isEmpty()) {
List<ContainerStatus> failed =
new LinkedList<>(failedContainers);
failedContainers.clear();
builder.completedContainersStatuses(failed);
}
}
return builder.build();
}
@Override
@ -184,6 +191,19 @@ public class MockServiceAM extends ServiceMaster {
return container;
}
public void feedFailedContainerToComp(Service service, int id, String
compName) {
ApplicationId applicationId = ApplicationId.fromString(service.getId());
ContainerId containerId = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(applicationId, 1), id);
ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
containerStatus.setContainerId(containerId);
synchronized (failedContainers) {
failedContainers.add(containerStatus);
}
}
public void flexComponent(String compName, long numberOfContainers)
throws IOException {
ClientAMProtocol.ComponentCountProto componentCountProto =
@ -218,4 +238,22 @@ public class MockServiceAM extends ServiceMaster {
}
}, 1000, 20000);
}
public ComponentInstance getCompInstance(String compName, String
instanceName) {
return context.scheduler.getAllComponents().get(compName)
.getComponentInstance(instanceName);
}
public void waitForCompInstanceState(ComponentInstance instance,
ComponentInstanceState state)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return instance.getState().equals(state);
}
}, 1000, 20000);
}
}

View File

@ -65,6 +65,7 @@ public class ServiceTestUtils {
private MiniYARNCluster yarnCluster = null;
private MiniDFSCluster hdfsCluster = null;
TestingCluster zkCluster;
private FileSystem fs = null;
private Configuration conf = null;
public static final int NUM_NMS = 1;
@ -165,7 +166,6 @@ public class ServiceTestUtils {
conf.setBoolean(NM_VMEM_CHECK_ENABLED, false);
conf.setBoolean(NM_PMEM_CHECK_ENABLED, false);
// setup zk cluster
TestingCluster zkCluster;
zkCluster = new TestingCluster(1);
zkCluster.start();
conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
@ -239,6 +239,9 @@ public class ServiceTestUtils {
hdfsCluster = null;
}
}
if (zkCluster != null) {
zkCluster.stop();
}
if (basedir != null) {
FileUtils.deleteDirectory(basedir);
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.registry.client.api.RegistryConstants
.KEY_REGISTRY_ZK_QUORUM;
public class TestServiceAM extends ServiceTestUtils{
private File basedir;
YarnConfiguration conf = new YarnConfiguration();
TestingCluster zkCluster;
@Before
public void setup() throws Exception {
basedir = new File("target", "apps");
if (basedir.exists()) {
FileUtils.deleteDirectory(basedir);
} else {
basedir.mkdirs();
}
zkCluster = new TestingCluster(1);
zkCluster.start();
conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
System.out.println("ZK cluster: " + zkCluster.getConnectString());
}
@After
public void tearDown() throws IOException {
if (basedir != null) {
FileUtils.deleteDirectory(basedir);
}
if (zkCluster != null) {
zkCluster.stop();
}
}
// Race condition YARN-7486
// 1. Allocate 1 container to compa and wait it to be started
// 2. Fail this container, and in the meanwhile allocate the 2nd container.
// 3. The 2nd container should not be assigned to compa-0 instance, because
// the compa-0 instance is not stopped yet.
// 4. check compa still has the instance in the pending list.
@Test
public void testContainerCompleted() throws TimeoutException,
InterruptedException {
ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
Service exampleApp = new Service();
exampleApp.setId(applicationId.toString());
exampleApp.setName("testContainerCompleted");
exampleApp.addComponent(createComponent("compa", 1, "pwd"));
MockServiceAM am = new MockServiceAM(exampleApp);
am.init(conf);
am.start();
ComponentInstance compa0 = am.getCompInstance("compa", "compa-0");
// allocate a container
am.feedContainerToComp(exampleApp, 1, "compa");
am.waitForCompInstanceState(compa0, ComponentInstanceState.STARTED);
System.out.println("Fail the container 1");
// fail the container
am.feedFailedContainerToComp(exampleApp, 1, "compa");
// allocate the second container immediately, this container will not be
// assigned to comp instance
// because the instance is not yet added to the pending list.
am.feedContainerToComp(exampleApp, 2, "compa");
am.waitForCompInstanceState(compa0, ComponentInstanceState.INIT);
// still 1 pending instance
Assert.assertEquals(1,
am.getComponent("compa").getPendingInstances().size());
am.stop();
}
}

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.yarn.service.monitor;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.MockServiceAM;
@ -37,10 +38,14 @@ import java.io.File;
import java.io.IOException;
import java.util.Collections;
import static org.apache.hadoop.registry.client.api.RegistryConstants
.KEY_REGISTRY_ZK_QUORUM;
public class TestServiceMonitor extends ServiceTestUtils {
private File basedir;
YarnConfiguration conf = new YarnConfiguration();
TestingCluster zkCluster;
@Before
public void setup() throws Exception {
@ -51,6 +56,10 @@ public class TestServiceMonitor extends ServiceTestUtils {
basedir.mkdirs();
}
conf.setLong(YarnServiceConf.READINESS_CHECK_INTERVAL, 2);
zkCluster = new TestingCluster(1);
zkCluster.start();
conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
System.out.println("ZK cluster: " + zkCluster.getConnectString());
}
@After
@ -58,6 +67,9 @@ public class TestServiceMonitor extends ServiceTestUtils {
if (basedir != null) {
FileUtils.deleteDirectory(basedir);
}
if (zkCluster != null) {
zkCluster.stop();
}
}
// Create compa with 1 container