YARN-6744. Recover component information on YARN native services AM restart. Contributed by Billie Rinaldi

This commit is contained in:
Jian He 2017-10-11 21:05:06 -07:00
parent c723021579
commit b8a7ef1b64
7 changed files with 212 additions and 60 deletions

View File

@ -157,6 +157,11 @@
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
@ -237,11 +237,6 @@ public void serviceStop() throws Exception {
serviceTimelinePublisher
.serviceAttemptUnregistered(context, diagnostics.toString());
}
// Cleanup each component instance. no need to release containers as
// they will be automatically released by RM
for (ComponentInstance instance : liveInstances.values()) {
instance.cleanupRegistryAndCompHdfsDir();
}
String msg = diagnostics.toString()
+ "Navigate to the failed component for more details.";
amRMClient
@ -266,11 +261,67 @@ public void serviceStart() throws Exception {
}
registerServiceInstance(context.attemptId, app);
//TODO handle containers recover
// recover components based on containers sent from RM
recoverComponents(response);
for (Component component : componentsById.values()) {
// Trigger initial evaluation of components
if (component.areDependenciesReady()) {
LOG.info("Triggering initial evaluation of component {}",
component.getName());
ComponentEvent event = new ComponentEvent(component.getName(), FLEX)
.setDesired(component.getComponentSpec().getNumberOfContainers());
component.handle(event);
}
}
}
private void recover() {
private void recoverComponents(RegisterApplicationMasterResponse response) {
List<Container> recoveredContainers = response
.getContainersFromPreviousAttempts();
LOG.info("Received {} containers from previous attempt.",
recoveredContainers.size());
Map<String, ServiceRecord> existingRecords = new HashMap<>();
List<String> existingComps = null;
try {
existingComps = yarnRegistryOperations.listComponents();
LOG.info("Found {} containers from ZK registry: {}", existingComps.size(),
existingComps);
} catch (Exception e) {
LOG.info("Could not read component paths: {}", e.getMessage());
}
if (existingComps != null) {
for (String existingComp : existingComps) {
try {
ServiceRecord record =
yarnRegistryOperations.getComponent(existingComp);
existingRecords.put(existingComp, record);
} catch (Exception e) {
LOG.warn("Could not resolve record for component {}: {}",
existingComp, e);
}
}
}
for (Container container : recoveredContainers) {
LOG.info("Handling container {} from previous attempt",
container.getId());
ServiceRecord record = existingRecords.get(RegistryPathUtils
.encodeYarnID(container.getId().toString()));
if (record != null) {
Component comp = componentsById.get(container.getAllocationRequestId());
ComponentEvent event =
new ComponentEvent(comp.getName(), CONTAINER_RECOVERED)
.setContainer(container)
.setInstance(comp.getComponentInstance(record.description));
comp.handle(event);
// do not remove requests in this case because we do not know if they
// have already been removed
} else {
LOG.info("Record not found in registry for container {} from previous" +
" attempt, releasing", container.getId());
amRMClient.releaseAssignedContainer(container.getId());
}
}
}
private void initGlobalTokensForSubstitute(ServiceContext context) {
@ -353,7 +404,7 @@ private void registerServiceInstance(ApplicationAttemptId attemptId,
executorService.submit(new Runnable() {
@Override public void run() {
try {
yarnRegistryOperations.registerSelf(serviceRecord, true);
yarnRegistryOperations.registerSelf(serviceRecord, false);
LOG.info("Registered service under {}; absolute path {}",
yarnRegistryOperations.getSelfRegistrationPath(),
yarnRegistryOperations.getAbsoluteSelfRegistrationPath());
@ -398,13 +449,6 @@ private void createAllComponents() {
componentsById.put(allocateId, component);
componentsByName.put(component.getName(), component);
allocateId++;
// Trigger the component without dependencies
if (component.areDependenciesReady()) {
ComponentEvent event = new ComponentEvent(compSpec.getName(), FLEX)
.setDesired(compSpec.getNumberOfContainers());
component.handle(event);
}
}
}
@ -458,17 +502,17 @@ public void onContainersAllocated(List<Container> containers) {
new ComponentEvent(comp.getName(), CONTAINER_ALLOCATED)
.setContainer(container);
dispatcher.getEventHandler().handle(event);
LOG.info("[COMPONENT {}]: {} outstanding container requests.",
comp.getName(),
amRMClient.getMatchingRequests(container.getAllocationRequestId()).size());
// remove the corresponding request
Collection<AMRMClient.ContainerRequest> collection = amRMClient
Collection<AMRMClient.ContainerRequest> requests = amRMClient
.getMatchingRequests(container.getAllocationRequestId());
if (collection.iterator().hasNext()) {
AMRMClient.ContainerRequest request = collection.iterator().next();
LOG.info("[COMPONENT {}]: {} outstanding container requests.",
comp.getName(), requests.size());
// remove the corresponding request
if (requests.iterator().hasNext()) {
LOG.info("[COMPONENT {}]: removing one container request.", comp
.getName());
AMRMClient.ContainerRequest request = requests.iterator().next();
amRMClient.removeContainerRequest(request);
}
}
}
@ -478,7 +522,7 @@ public void onContainersCompleted(List<ContainerStatus> statuses) {
ContainerId containerId = status.getContainerId();
ComponentInstance instance = liveInstances.get(status.getContainerId());
if (instance == null) {
LOG.error(
LOG.warn(
"Container {} Completed. No component instance exists. exitStatus={}. diagnostics={} ",
containerId, status.getExitStatus(), status.getDiagnostics());
return;

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.ServiceMetrics;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
@ -78,7 +79,7 @@ public class Component implements EventHandler<ComponentEvent> {
private ServiceContext context;
private AMRMClientAsync<ContainerRequest> amrmClient;
private AtomicLong instanceIdCounter = new AtomicLong();
private Map<ComponentInstanceId, ComponentInstance> compInstances =
private Map<String, ComponentInstance> compInstances =
new ConcurrentHashMap<>();
// component instances to be assigned with a container
private List<ComponentInstance> pendingInstances = new LinkedList<>();
@ -101,6 +102,9 @@ public class Component implements EventHandler<ComponentEvent> {
// INIT will only got to FLEXING
.addTransition(INIT, EnumSet.of(STABLE, FLEXING),
FLEX, new FlexComponentTransition())
// container recovered on AM restart
.addTransition(INIT, INIT, CONTAINER_RECOVERED,
new ContainerRecoveredTransition())
// container allocated by RM
.addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED,
@ -165,7 +169,7 @@ private void createOneCompInstance() {
new ComponentInstanceId(instanceIdCounter.getAndIncrement(),
componentSpec.getName());
ComponentInstance instance = new ComponentInstance(this, id);
compInstances.put(id, instance);
compInstances.put(instance.getCompInstanceName(), instance);
pendingInstances.add(instance);
}
@ -186,8 +190,8 @@ public ComponentState transition(Component component,
// This happens on init
LOG.info("[INIT COMPONENT " + component.getName() + "]: " + event
.getDesired() + " instances.");
component.requestContainers(event.getDesired());
return FLEXING;
component.requestContainers(component.pendingInstances.size());
return checkIfStable(component);
}
long before = component.getComponentSpec().getNumberOfContainers();
long delta = event.getDesired() - before;
@ -205,14 +209,14 @@ public ComponentState transition(Component component,
LOG.info("[FLEX DOWN COMPONENT " + component.getName()
+ "]: scaling down from " + before + " to " + event.getDesired());
List<ComponentInstance> list =
new ArrayList<>(component.compInstances.values());
new ArrayList<>(component.getAllComponentInstances());
// sort in Most recent -> oldest order, destroy most recent ones.
Collections.sort(list, Collections.reverseOrder());
for (int i = 0; i < delta; i++) {
ComponentInstance instance = list.get(i);
// remove the instance
component.compInstances.remove(instance.getCompInstanceId());
component.compInstances.remove(instance.getCompInstanceName());
component.pendingInstances.remove(instance);
component.componentMetrics.containersFailed.incr();
component.componentMetrics.containersRunning.decr();
@ -236,6 +240,46 @@ public void transition(Component component, ComponentEvent event) {
}
}
private static class ContainerRecoveredTransition extends BaseTransition {
@Override
public void transition(Component component, ComponentEvent event) {
ComponentInstance instance = event.getInstance();
Container container = event.getContainer();
if (instance == null) {
LOG.info("[COMPONENT {}]: Trying to recover {} but event did not " +
"specify component instance",
component.getName(), container.getId());
component.releaseContainer(container);
return;
}
if (instance.hasContainer()) {
LOG.info(
"[COMPONENT {}]: Instance {} already has container, release " +
"surplus container {}",
instance.getCompName(), instance.getCompInstanceId(), container
.getId());
component.releaseContainer(container);
return;
}
component.pendingInstances.remove(instance);
LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " +
"host {}, num pending component instances reduced to {} ",
component.getName(), container.getId(), instance
.getCompInstanceName(), container.getNodeId(), component
.pendingInstances.size());
instance.setContainer(container);
ProviderUtils.initCompInstanceDir(component.getContext().fs, instance);
component.getScheduler().addLiveCompInstance(container.getId(), instance);
LOG.info("[COMPONENT {}]: Marking {} as started for component " +
"instance {}", component.getName(), event.getContainer().getId(),
instance.getCompInstanceId());
component.compInstanceDispatcher.getEventHandler().handle(
new ComponentInstanceEvent(instance.getContainerId(),
START));
component.incRunningContainers();
}
}
private static class ContainerStartedTransition implements
MultipleArcTransition<Component,ComponentEvent,ComponentState> {
@ -280,14 +324,18 @@ public ServiceMetrics getCompMetrics () {
return componentMetrics;
}
private void releaseContainer(Container container) {
scheduler.getAmRMClient().releaseAssignedContainer(container.getId());
componentMetrics.surplusContainers.incr();
scheduler.getServiceMetrics().surplusContainers.incr();
}
private void assignContainerToCompInstance(Container container) {
if (pendingInstances.size() == 0) {
LOG.info(
"[COMPONENT {}]: No pending component instance left, release surplus container {}",
getName(), container.getId());
scheduler.getAmRMClient().releaseAssignedContainer(container.getId());
componentMetrics.surplusContainers.incr();
scheduler.getServiceMetrics().surplusContainers.incr();
releaseContainer(container);
return;
}
ComponentInstance instance = pendingInstances.remove(0);
@ -397,7 +445,7 @@ public Map<String, String> getDependencyHostIpTokens() {
}
for (String dependency : dependencies) {
Collection<ComponentInstance> instances = scheduler.getAllComponents()
.get(dependency).getAllComponentInstances().values();
.get(dependency).getAllComponentInstances();
for (ComponentInstance instance : instances) {
if (instance.getContainerStatus() == null) {
continue;
@ -447,8 +495,12 @@ public int getNumDesiredInstances() {
return componentMetrics.containersDesired.value();
}
public Map<ComponentInstanceId, ComponentInstance> getAllComponentInstances() {
return compInstances;
public ComponentInstance getComponentInstance(String componentInstanceName) {
return compInstances.get(componentInstanceName);
}
public Collection<ComponentInstance> getAllComponentInstances() {
return compInstances.values();
}
public org.apache.hadoop.yarn.service.api.records.Component getComponentSpec() {

View File

@ -21,6 +21,7 @@
public enum ComponentEventType {
FLEX,
CONTAINER_ALLOCATED,
CONTAINER_RECOVERED,
CONTAINER_STARTED,
CONTAINER_COMPLETED
}

View File

@ -23,7 +23,6 @@
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Container;
@ -35,6 +34,8 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.component.Component;
@ -143,10 +144,19 @@ private static class ContainerStartedTransition extends BaseTransition {
compInstance.getContainerId(), compInstance), 0, 1,
TimeUnit.SECONDS);
long containerStartTime = System.currentTimeMillis();
try {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(compInstance.getContainer()
.getContainerToken());
containerStartTime = containerTokenIdentifier.getCreationTime();
} catch (Exception e) {
LOG.info("Could not get container creation time, using current time");
}
org.apache.hadoop.yarn.service.api.records.Container container =
new org.apache.hadoop.yarn.service.api.records.Container();
container.setId(compInstance.getContainerId().toString());
container.setLaunchTime(new Date());
container.setLaunchTime(new Date(containerStartTime));
container.setState(ContainerState.RUNNING_BUT_UNREADY);
container.setBareHost(compInstance.container.getNodeId().getHost());
container.setComponentName(compInstance.getCompInstanceName());
@ -156,7 +166,7 @@ private static class ContainerStartedTransition extends BaseTransition {
}
compInstance.containerSpec = container;
compInstance.getCompSpec().addContainer(container);
compInstance.containerStartedTime = System.currentTimeMillis();
compInstance.containerStartedTime = containerStartTime;
if (compInstance.timelineServiceEnabled) {
compInstance.serviceTimelinePublisher
@ -243,6 +253,8 @@ public void transition(ComponentInstance compInstance,
}
ExitUtil.terminate(-1);
}
compInstance.removeContainer();
}
}
@ -276,6 +288,15 @@ public void handle(ComponentInstanceEvent event) {
}
}
public boolean hasContainer() {
return this.container != null;
}
public void removeContainer() {
this.container = null;
this.compInstanceId.setContainerId(null);
}
public void setContainer(Container container) {
this.container = container;
this.compInstanceId.setContainerId(container.getId());

View File

@ -209,21 +209,26 @@ public void localizeServiceKeytabs(AbstractLauncher launcher,
}
}
public static Path initCompInstanceDir(SliderFileSystem fs,
ComponentInstance instance) {
Path compDir = new Path(new Path(fs.getAppDir(), "components"),
instance.getCompName());
Path compInstanceDir = new Path(compDir, instance.getCompInstanceName());
instance.setCompInstanceDir(compInstanceDir);
return compInstanceDir;
}
// 1. Create all config files for a component on hdfs for localization
// 2. Add the config file to localResource
public static synchronized void createConfigFileAndAddLocalResource(
AbstractLauncher launcher, SliderFileSystem fs, Component component,
Map<String, String> tokensForSubstitution, ComponentInstance instance,
ServiceContext context) throws IOException {
Path compDir =
new Path(new Path(fs.getAppDir(), "components"), component.getName());
Path compInstanceDir =
new Path(compDir, instance.getCompInstanceName());
Path compInstanceDir = initCompInstanceDir(fs, instance);
if (!fs.getFileSystem().exists(compInstanceDir)) {
log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir);
fs.getFileSystem().mkdirs(compInstanceDir,
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
instance.setCompInstanceDir(compInstanceDir);
} else {
log.info("Component instance conf dir already exists: " + compInstanceDir);
}

View File

@ -48,7 +48,7 @@ public class YarnRegistryViewForProviders {
private final RegistryOperations registryOperations;
private final String user;
private final String sliderServiceClass;
private final String serviceClass;
private final String instanceName;
/**
* Record used where the service registered itself.
@ -57,20 +57,20 @@ public class YarnRegistryViewForProviders {
private ServiceRecord selfRegistration;
/**
* Path where record was registered
* Path where record was registered.
* Null until the service is registered
*/
private String selfRegistrationPath;
public YarnRegistryViewForProviders(RegistryOperations registryOperations,
String user,
String sliderServiceClass,
String serviceClass,
String instanceName,
ApplicationAttemptId applicationAttemptId) {
Preconditions.checkArgument(registryOperations != null,
"null registry operations");
Preconditions.checkArgument(user != null, "null user");
Preconditions.checkArgument(SliderUtils.isSet(sliderServiceClass),
Preconditions.checkArgument(SliderUtils.isSet(serviceClass),
"unset service class");
Preconditions.checkArgument(SliderUtils.isSet(instanceName),
"instanceName");
@ -78,7 +78,7 @@ public YarnRegistryViewForProviders(RegistryOperations registryOperations,
"null applicationAttemptId");
this.registryOperations = registryOperations;
this.user = user;
this.sliderServiceClass = sliderServiceClass;
this.serviceClass = serviceClass;
this.instanceName = instanceName;
}
@ -117,7 +117,7 @@ public String getAbsoluteSelfRegistrationPath() {
}
/**
* Add a component under the slider name/entry
* Add a component under the slider name/entry.
* @param componentName component name
* @param record record to put
* @throws IOException
@ -125,13 +125,13 @@ public String getAbsoluteSelfRegistrationPath() {
public void putComponent(String componentName,
ServiceRecord record) throws
IOException {
putComponent(sliderServiceClass, instanceName,
putComponent(serviceClass, instanceName,
componentName,
record);
}
/**
* Add a component
* Add a component.
* @param serviceClass service class to use under ~user
* @param componentName component name
* @param record record to put
@ -146,9 +146,33 @@ public void putComponent(String serviceClass,
registryOperations.mknode(RegistryPathUtils.parentOf(path), true);
registryOperations.bind(path, record, BindFlags.OVERWRITE);
}
/**
* Add a service under a path, optionally purging any history
* Get a component.
* @param componentName component name
* @return the service record
* @throws IOException
*/
public ServiceRecord getComponent(String componentName) throws IOException {
String path = RegistryUtils.componentPath(
user, serviceClass, instanceName, componentName);
LOG.info("Resolving path {}", path);
return registryOperations.resolve(path);
}
/**
* List components.
* @return a list of components
* @throws IOException
*/
public List<String> listComponents() throws IOException {
String path = RegistryUtils.componentListPath(
user, serviceClass, instanceName);
return registryOperations.list(path);
}
/**
* Add a service under a path, optionally purging any history.
* @param username user
* @param serviceClass service class to use under ~user
* @param serviceName name of the service
@ -173,7 +197,7 @@ public String putService(String username,
}
/**
* Add a service under a path for the current user
* Add a service under a path for the current user.
* @param record service record
* @param deleteTreeFirst perform recursive delete of the path first
* @return the path the service was created at
@ -183,20 +207,20 @@ public String registerSelf(
ServiceRecord record,
boolean deleteTreeFirst) throws IOException {
selfRegistrationPath =
putService(user, sliderServiceClass, instanceName, record, deleteTreeFirst);
putService(user, serviceClass, instanceName, record, deleteTreeFirst);
setSelfRegistration(record);
return selfRegistrationPath;
}
/**
* Delete a component
* Delete a component.
* @param containerId component name
* @throws IOException
*/
public void deleteComponent(ComponentInstanceId instanceId,
String containerId) throws IOException {
String path = RegistryUtils.componentPath(
user, sliderServiceClass, instanceName,
user, serviceClass, instanceName,
containerId);
LOG.info(instanceId + ": Deleting registry path " + path);
registryOperations.delete(path, false);