YARN-5576. Allow resource localization while container is running. Contributed by Jian He.

This commit is contained in:
Varun Vasudev 2016-09-06 20:01:45 +05:30
parent 62a9667136
commit e6fcfe28e3
22 changed files with 577 additions and 160 deletions

View File

@ -181,6 +181,15 @@ public abstract boolean signalContainer(ContainerSignalContext ctx)
public abstract void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException;
/**
* Create a symlink file which points to the target.
* @param target The target for symlink
* @param symlink the symlink file
* @throws IOException Error when creating symlinks
*/
public abstract void symLink(String target, String symlink)
throws IOException;
/**
* Check if a container is alive.
* @param ctx Encapsulates information necessary for container liveness check.

View File

@ -126,4 +126,6 @@ interface QueuingContext {
void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher);
NMTimelinePublisher getNMTimelinePublisher();
ContainerExecutor getContainerExecutor();
}

View File

@ -38,6 +38,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
@ -511,6 +512,11 @@ public void deleteAsUser(DeletionAsUserContext ctx)
}
}
@Override
public void symLink(String target, String symlink) throws IOException {
FileUtil.symLink(target, symlink);
}
/** Permissions for user dir.
* $local.dir/usercache/$user */
static final short USER_PERM = (short)0750;

View File

@ -490,6 +490,12 @@ public void deleteAsUser(DeletionAsUserContext ctx)
}
}
@Override
public void symLink(String target, String symlink)
throws IOException {
}
/**
* Converts a directory list to a docker mount string
* @param dirs

View File

@ -682,6 +682,11 @@ protected File[] readDirAsUser(String user, Path dir) {
return files.toArray(new File[files.size()]);
}
@Override
public void symLink(String target, String symlink) {
}
@Override
public boolean isContainerAlive(ContainerLivenessContext ctx)
throws IOException {

View File

@ -343,6 +343,9 @@ protected void serviceInit(Configuration conf) throws Exception {
this.context = createNMContext(containerTokenSecretManager,
nmTokenSecretManager, nmStore, isDistSchedulingEnabled, conf);
((NMContext)context).setContainerExecutor(exec);
nodeLabelsProvider = createNodeLabelsProvider(conf);
if (null == nodeLabelsProvider) {
@ -509,6 +512,7 @@ public static class NMContext implements Context {
private OpportunisticContainerAllocator containerAllocator;
private final QueuingContext queuingContext;
private ContainerExecutor executor;
private NMTimelinePublisher nmTimelinePublisher;
@ -702,6 +706,14 @@ public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) {
public NMTimelinePublisher getNMTimelinePublisher() {
return nmTimelinePublisher;
}
public ContainerExecutor getContainerExecutor() {
return this.executor;
}
public void setContainerExecutor(ContainerExecutor executor) {
this.executor = executor;
}
}
/**

View File

@ -18,25 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import static org.apache.hadoop.service.Service.STATE.STARTED;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -76,6 +59,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -129,7 +113,9 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
@ -154,8 +140,25 @@
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import static org.apache.hadoop.service.Service.STATE.STARTED;
public class ContainerManagerImpl extends CompositeService implements
ContainerManager {
@ -1525,6 +1528,31 @@ public SignalContainerResponse signalToContainer(
public ResourceLocalizationResponse localize(
ResourceLocalizationRequest request) throws YarnException, IOException {
ContainerId containerId = request.getContainerId();
Container container = context.getContainers().get(containerId);
if (container == null) {
throw new YarnException("Specified " + containerId + " does not exist!");
}
if (!container.getContainerState()
.equals(org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING)) {
throw new YarnException(
containerId + " is at " + container.getContainerState()
+ " state. Not able to localize new resources.");
}
try {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
container.getResourceSet().addResources(request.getLocalResources());
if (req != null && !req.isEmpty()) {
dispatcher.getEventHandler()
.handle(new ContainerLocalizationRequestEvent(container, req));
}
} catch (URISyntaxException e) {
LOG.info("Error when parsing local resource URI for " + containerId, e);
throw new YarnException(e);
}
return ResourceLocalizationResponse.newInstance();
}

View File

@ -18,9 +18,6 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -31,6 +28,10 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import java.util.List;
import java.util.Map;
public interface Container extends EventHandler<ContainerEvent> {
@ -74,4 +75,5 @@ public interface Container extends EventHandler<ContainerEvent> {
Priority getPriority();
ResourceSet getResourceSet();
}

View File

@ -18,18 +18,15 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -65,6 +62,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
@ -124,20 +122,7 @@ public class ContainerImpl implements Container {
private final Configuration daemonConf;
private static final Log LOG = LogFactory.getLog(ContainerImpl.class);
private final Map<LocalResourceRequest,List<String>> pendingResources =
new HashMap<LocalResourceRequest,List<String>>();
private final Map<Path,List<String>> localizedResources =
new HashMap<Path,List<String>>();
private final List<LocalResourceRequest> publicRsrcs =
new ArrayList<LocalResourceRequest>();
private final List<LocalResourceRequest> privateRsrcs =
new ArrayList<LocalResourceRequest>();
private final List<LocalResourceRequest> appRsrcs =
new ArrayList<LocalResourceRequest>();
private final Map<LocalResourceRequest, Path> resourcesToBeUploaded =
new ConcurrentHashMap<LocalResourceRequest, Path>();
private final Map<LocalResourceRequest, Boolean> resourcesUploadPolicies =
new ConcurrentHashMap<LocalResourceRequest, Boolean>();
// whether container has been recovered after a restart
private RecoveredContainerStatus recoveredStatus =
@ -145,6 +130,7 @@ public class ContainerImpl implements Container {
// whether container was marked as killed after recovery
private boolean recoveredAsKilled = false;
private Context context;
private ResourceSet resourceSet;
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
@ -204,6 +190,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
stateMachine = stateMachineFactory.make(this);
this.context = context;
this.resourceSet = new ResourceSet();
}
// constructor for a recovered container
@ -312,6 +299,12 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
ContainerState.EXITED_WITH_FAILURE),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new RetryFailureTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.RESOURCE_LOCALIZED,
new ResourceLocalizedWhileRunningTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.RESOURCE_FAILED,
new ResourceLocalizationFailedWhileRunningTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
@ -470,7 +463,7 @@ public Map<Path,List<String>> getLocalizedResources() {
try {
if (ContainerState.LOCALIZED == getContainerState()
|| ContainerState.RELAUNCHING == getContainerState()) {
return localizedResources;
return resourceSet.getLocalizedResources();
} else {
return null;
}
@ -591,6 +584,11 @@ public void setLogDir(String logDir) {
this.logDir = logDir;
}
@Override
public ResourceSet getResourceSet() {
return this.resourceSet;
}
@SuppressWarnings("unchecked")
private void sendFinishedEvents() {
// Inform the application
@ -653,7 +651,7 @@ private void addDiagnostics(String... diags) {
for (String s : diags) {
this.diagnostics.append(s);
}
if (isRetryContextSet() && diagnostics.length() > diagnosticsMaxSize) {
if (diagnostics.length() > diagnosticsMaxSize) {
diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize);
}
try {
@ -667,17 +665,7 @@ private void addDiagnostics(String... diags) {
@SuppressWarnings("unchecked") // dispatcher not typed
public void cleanup() {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
if (!publicRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
}
if (!privateRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
}
if (!appRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
}
resourceSet.getAllResourcesByVisibility();
dispatcher.getEventHandler().handle(
new ContainerLocalizationCleanupEvent(this, rsrc));
}
@ -697,7 +685,7 @@ public void transition(ContainerImpl container, ContainerEvent event) {
* message.
*
* If there are resources to localize, sends a
* ContainerLocalizationRequest (INIT_CONTAINER_RESOURCES)
* ContainerLocalizationRequest (LOCALIZE_CONTAINER_RESOURCES)
* to the ResourceLocalizationManager and enters LOCALIZING state.
*
* If there are no resources to localize, sends LAUNCH_CONTAINER event
@ -749,39 +737,15 @@ public ContainerState transition(ContainerImpl container,
}
container.containerLocalizationStartTime = clock.getTime();
// Send requests for public, private resources
Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
if (!cntrRsrc.isEmpty()) {
try {
for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
try {
LocalResourceRequest req =
new LocalResourceRequest(rsrc.getValue());
List<String> links = container.pendingResources.get(req);
if (links == null) {
links = new ArrayList<String>();
container.pendingResources.put(req, links);
}
links.add(rsrc.getKey());
storeSharedCacheUploadPolicy(container, req, rsrc.getValue()
.getShouldBeUploadedToSharedCache());
switch (rsrc.getValue().getVisibility()) {
case PUBLIC:
container.publicRsrcs.add(req);
break;
case PRIVATE:
container.privateRsrcs.add(req);
break;
case APPLICATION:
container.appRsrcs.add(req);
break;
}
} catch (URISyntaxException e) {
LOG.info("Got exception parsing " + rsrc.getKey()
+ " and value " + rsrc.getValue());
throw e;
}
}
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
container.resourceSet.addResources(ctxt.getLocalResources());
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(container, req));
} catch (URISyntaxException e) {
// malformed resource; abort container launch
LOG.warn("Failed to parse resource-request", e);
@ -789,21 +753,6 @@ public ContainerState transition(ContainerImpl container,
container.metrics.endInitingContainer();
return ContainerState.LOCALIZATION_FAILED;
}
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new LinkedHashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
if (!container.publicRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
}
if (!container.privateRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
}
if (!container.appRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
}
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(container, req));
return ContainerState.LOCALIZING;
} else {
container.sendLaunchEvent();
@ -813,27 +762,6 @@ public ContainerState transition(ContainerImpl container,
}
}
/**
* Store the resource's shared cache upload policies
* Given LocalResourceRequest can be shared across containers in
* LocalResourcesTrackerImpl, we preserve the upload policies here.
* In addition, it is possible for the application to create several
* "identical" LocalResources as part of
* ContainerLaunchContext.setLocalResources with different symlinks.
* There is a corner case where these "identical" local resources have
* different upload policies. For that scenario, upload policy will be set to
* true as long as there is at least one LocalResource entry with
* upload policy set to true.
*/
private static void storeSharedCacheUploadPolicy(ContainerImpl container,
LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
Boolean storedUploadPolicy =
container.resourcesUploadPolicies.get(resourceRequest);
if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
container.resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
}
}
/**
* Transition when one of the requested resources for this container
* has been successfully localized.
@ -847,22 +775,21 @@ public ContainerState transition(ContainerImpl container,
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
LocalResourceRequest resourceRequest = rsrcEvent.getResource();
Path location = rsrcEvent.getLocation();
List<String> syms = container.pendingResources.remove(resourceRequest);
List<String> syms =
container.resourceSet.resourceLocalized(resourceRequest, location);
if (null == syms) {
LOG.warn("Localized unknown resource " + resourceRequest +
" for container " + container.containerId);
assert false;
// fail container?
LOG.info("Localized resource " + resourceRequest +
" for container " + container.containerId);
return ContainerState.LOCALIZING;
}
container.localizedResources.put(location, syms);
// check to see if this resource should be uploaded to the shared cache
// as well
if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
container.resourcesToBeUploaded.put(resourceRequest, location);
container.resourceSet.getResourcesToBeUploaded()
.put(resourceRequest, location);
}
if (!container.pendingResources.isEmpty()) {
if (!container.resourceSet.getPendingResources().isEmpty()) {
return ContainerState.LOCALIZING;
}
@ -884,7 +811,8 @@ public ContainerState transition(ContainerImpl container,
&& container.recoveredStatus != RecoveredContainerStatus.COMPLETED) {
// kick off uploads to the shared cache
container.dispatcher.getEventHandler().handle(
new SharedCacheUploadEvent(container.resourcesToBeUploaded, container
new SharedCacheUploadEvent(
container.resourceSet.getResourcesToBeUploaded(), container
.getLaunchContext(), container.getUser(),
SharedCacheUploadEventType.UPLOAD));
}
@ -893,6 +821,56 @@ public ContainerState transition(ContainerImpl container,
}
}
/**
* Resource is localized while the container is running - create symlinks
*/
static class ResourceLocalizedWhileRunningTransition
extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent =
(ContainerResourceLocalizedEvent) event;
List<String> links = container.resourceSet
.resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation());
// creating symlinks.
for (String link : links) {
try {
String linkFile = new Path(container.workDir, link).toString();
if (new File(linkFile).exists()) {
LOG.info("Symlink file already exists: " + linkFile);
} else {
container.context.getContainerExecutor()
.symLink(rsrcEvent.getLocation().toString(), linkFile);
LOG.info("Created symlink: " + linkFile + " -> " + rsrcEvent
.getLocation());
}
} catch (IOException e) {
String message = String
.format("Error when creating symlink %s -> %s", link,
rsrcEvent.getLocation());
LOG.error(message, e);
}
}
}
}
/**
* Resource localization failed while the container is running.
*/
static class ResourceLocalizationFailedWhileRunningTransition
extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceFailedEvent failedEvent =
(ContainerResourceFailedEvent) event;
container.resourceSet
.resourceLocalizationFailed(failedEvent.getResource());
container.addDiagnostics(failedEvent.getDiagnosticMessage());
}
}
/**
* Transition from LOCALIZED state to RUNNING state upon receiving
* a CONTAINER_LAUNCHED event
@ -1136,17 +1114,10 @@ static class LocalizedResourceDuringKillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
List<String> syms =
container.pendingResources.remove(rsrcEvent.getResource());
if (null == syms) {
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
" for container " + container.containerId);
assert false;
// fail container?
return;
}
container.localizedResources.put(rsrcEvent.getLocation(), syms);
ContainerResourceLocalizedEvent rsrcEvent =
(ContainerResourceLocalizedEvent) event;
container.resourceSet
.resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation());
}
}
@ -1402,7 +1373,7 @@ private boolean hasDefaultExitCode() {
*/
private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
LocalResourceRequest resource) {
return container.resourcesUploadPolicies.get(resource);
return container.resourceSet.getResourcesUploadPolicies().get(resource);
}
@VisibleForTesting

View File

@ -1201,16 +1201,16 @@ public static String getExitCodeFile(String pidFile) {
private void recordContainerLogDir(ContainerId containerId,
String logDir) throws IOException{
container.setLogDir(logDir);
if (container.isRetryContextSet()) {
container.setLogDir(logDir);
context.getNMStateStore().storeContainerLogDir(containerId, logDir);
}
}
private void recordContainerWorkDir(ContainerId containerId,
String workDir) throws IOException{
container.setWorkDir(workDir);
if (container.isRetryContextSet()) {
container.setWorkDir(workDir);
context.getNMStateStore().storeContainerWorkDir(containerId, workDir);
}
}

View File

@ -112,6 +112,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
@ -135,7 +136,6 @@
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload;
import com.google.common.annotations.VisibleForTesting;
@ -419,7 +419,7 @@ public void handle(LocalizationEvent event) {
handleInitApplicationResources(
((ApplicationLocalizationEvent)event).getApplication());
break;
case INIT_CONTAINER_RESOURCES:
case LOCALIZE_CONTAINER_RESOURCES:
handleInitContainerResources((ContainerLocalizationRequestEvent) event);
break;
case CONTAINER_RESOURCES_LOCALIZED:
@ -469,6 +469,13 @@ private void handleInitApplicationResources(Application app) {
private void handleInitContainerResources(
ContainerLocalizationRequestEvent rsrcReqs) {
Container c = rsrcReqs.getContainer();
EnumSet<ContainerState> set =
EnumSet.of(ContainerState.LOCALIZING, ContainerState.RUNNING);
if (!set.contains(c.getContainerState())) {
LOG.warn(c.getContainerId() + " is at " + c.getContainerState()
+ " state, do not localize resources.");
return;
}
// create a loading cache for the file statuses
LoadingCache<Path,Future<FileStatus>> statCache =
CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig()));
@ -538,7 +545,7 @@ private void handleCleanupContainerResources(
}
String locId = c.getContainerId().toString();
localizerTracker.cleanupPrivLocalizers(locId);
// Delete the container directories
String userName = c.getUser();
String containerIDStr = c.toString();
@ -747,6 +754,14 @@ public void handle(LocalizerEvent event) {
case APPLICATION:
synchronized (privLocalizers) {
LocalizerRunner localizer = privLocalizers.get(locId);
if (localizer != null && localizer.killContainerLocalizer.get()) {
// Old localizer thread has been stopped, remove it and creates
// a new localizer thread.
LOG.info("New " + event.getType() + " localize request for "
+ locId + ", remove old private localizer.");
cleanupPrivLocalizers(locId);
localizer = null;
}
if (null == localizer) {
LOG.info("Created localizer for " + locId);
localizer = new LocalizerRunner(req.getContext(), locId);

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* All Resources requested by the container.
*/
public class ResourceSet {
private static final Log LOG = LogFactory.getLog(ResourceSet.class);
// resources by localization state (localized, pending, failed)
private Map<Path, List<String>> localizedResources =
new ConcurrentHashMap<>();
private Map<LocalResourceRequest, List<String>> pendingResources =
new ConcurrentHashMap<>();
private Set<LocalResourceRequest> resourcesFailedToBeLocalized =
new HashSet<>();
// resources by visibility (public, private, app)
private final List<LocalResourceRequest> publicRsrcs =
new ArrayList<>();
private final List<LocalResourceRequest> privateRsrcs =
new ArrayList<>();
private final List<LocalResourceRequest> appRsrcs =
new ArrayList<>();
private final Map<LocalResourceRequest, Path> resourcesToBeUploaded =
new ConcurrentHashMap<>();
private final Map<LocalResourceRequest, Boolean> resourcesUploadPolicies =
new ConcurrentHashMap<>();
public Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
addResources(Map<String, LocalResource> localResourceMap)
throws URISyntaxException {
if (localResourceMap == null || localResourceMap.isEmpty()) {
return null;
}
Map<LocalResourceRequest, List<String>> allResources = new HashMap<>();
List<LocalResourceRequest> publicList = new ArrayList<>();
List<LocalResourceRequest> privateList = new ArrayList<>();
List<LocalResourceRequest> appList = new ArrayList<>();
for (Map.Entry<String, LocalResource> rsrc : localResourceMap.entrySet()) {
LocalResource resource = rsrc.getValue();
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
allResources.putIfAbsent(req, new ArrayList<>());
allResources.get(req).add(rsrc.getKey());
storeSharedCacheUploadPolicy(req,
resource.getShouldBeUploadedToSharedCache());
switch (resource.getVisibility()) {
case PUBLIC:
publicList.add(req);
break;
case PRIVATE:
privateList.add(req);
break;
case APPLICATION:
appList.add(req);
break;
default:
break;
}
}
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new LinkedHashMap<>();
if (!publicList.isEmpty()) {
publicRsrcs.addAll(publicList);
req.put(LocalResourceVisibility.PUBLIC, publicList);
}
if (!privateList.isEmpty()) {
privateRsrcs.addAll(privateList);
req.put(LocalResourceVisibility.PRIVATE, privateList);
}
if (!appList.isEmpty()) {
appRsrcs.addAll(appList);
req.put(LocalResourceVisibility.APPLICATION, appList);
}
if (!allResources.isEmpty()) {
this.pendingResources.putAll(allResources);
}
return req;
}
/**
* Called when resource localized.
* @param request The original request for the localized resource
* @param location The path where the resource is localized
* @return The list of symlinks for the localized resources.
*/
public List<String> resourceLocalized(LocalResourceRequest request,
Path location) {
List<String> symlinks = pendingResources.remove(request);
if (symlinks == null) {
return null;
} else {
localizedResources.put(location, symlinks);
return symlinks;
}
}
public void resourceLocalizationFailed(LocalResourceRequest request) {
pendingResources.remove(request);
resourcesFailedToBeLocalized.add(request);
}
public synchronized Map<LocalResourceVisibility,
Collection<LocalResourceRequest>> getAllResourcesByVisibility() {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
new HashMap<>();
if (!publicRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
}
if (!privateRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
}
if (!appRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
}
return rsrc;
}
/**
* Store the resource's shared cache upload policies
* Given LocalResourceRequest can be shared across containers in
* LocalResourcesTrackerImpl, we preserve the upload policies here.
* In addition, it is possible for the application to create several
* "identical" LocalResources as part of
* ContainerLaunchContext.setLocalResources with different symlinks.
* There is a corner case where these "identical" local resources have
* different upload policies. For that scenario, upload policy will be set to
* true as long as there is at least one LocalResource entry with
* upload policy set to true.
*/
private void storeSharedCacheUploadPolicy(
LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
Boolean storedUploadPolicy = resourcesUploadPolicies.get(resourceRequest);
if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
}
}
public Map<Path, List<String>> getLocalizedResources() {
return localizedResources;
}
public Map<LocalResourceRequest, Path> getResourcesToBeUploaded() {
return resourcesToBeUploaded;
}
public Map<LocalResourceRequest, Boolean> getResourcesUploadPolicies() {
return resourcesUploadPolicies;
}
public Map<LocalResourceRequest, List<String>> getPendingResources() {
return pendingResources;
}
}

View File

@ -44,7 +44,7 @@ public class ContainerLocalizationRequestEvent extends
*/
public ContainerLocalizationRequestEvent(Container c,
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c);
super(LocalizationEventType.LOCALIZE_CONTAINER_RESOURCES, c);
this.rsrc = rsrc;
}

View File

@ -19,7 +19,7 @@
public enum LocalizationEventType {
INIT_APPLICATION_RESOURCES,
INIT_CONTAINER_RESOURCES,
LOCALIZE_CONTAINER_RESOURCES,
CACHE_CLEANUP,
CLEANUP_CONTAINER_RESOURCES,
DESTROY_APPLICATION_RESOURCES,

View File

@ -326,7 +326,7 @@ public void publishLocalizationEvent(LocalizationEvent event) {
publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
ContainerMetricsConstants.LOCALIZATION_FINISHED_EVENT_TYPE);
break;
case INIT_CONTAINER_RESOURCES:
case LOCALIZE_CONTAINER_RESOURCES:
publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
ContainerMetricsConstants.LOCALIZATION_START_EVENT_TYPE);
break;

View File

@ -86,7 +86,7 @@ public void handle(LocalizationEvent event) {
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
app.getAppId()));
break;
case INIT_CONTAINER_RESOURCES:
case LOCALIZE_CONTAINER_RESOURCES:
ContainerLocalizationRequestEvent rsrcReqs =
(ContainerLocalizationRequestEvent) event;
// simulate localization of all requested resources

View File

@ -131,7 +131,18 @@ public void testContainerLaunchAndExitFailure() throws IOException,
LOG.info("Running testContainerLaunchAndExitFailure");
super.testContainerLaunchAndExitFailure();
}
@Override
public void testLocalingResourceWhileContainerRunning()
throws Exception {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
super.testLocalingResourceWhileContainerRunning();
}
@Override
public void testLocalFilesCleanup() throws InterruptedException,
IOException, YarnException {

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
@ -715,6 +716,11 @@ public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) {
@Override
public NMTimelinePublisher getNMTimelinePublisher() {
return null;
}
@Override
public ContainerExecutor getContainerExecutor() {
return null;
}
}

View File

@ -36,17 +36,20 @@
import java.util.List;
import java.util.Map;
import com.google.common.base.Supplier;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@ -83,6 +86,7 @@
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -459,7 +463,138 @@ public void testContainerLaunchAndExitFailure() throws IOException,
// and verify exit code returned
testContainerLaunchAndExit(exitCode);
}
private Map<String, LocalResource> setupLocalResources(String fileName,
String symLink) throws Exception {
// ////// Create the resources for the container
File dir = new File(tmpDir, "dir");
dir.mkdirs();
File file = new File(dir, fileName);
PrintWriter fileWriter = new PrintWriter(file);
fileWriter.write("Hello World!");
fileWriter.close();
URL resourceURL = URL.fromPath(FileContext.getLocalFSFileContext()
.makeQualified(new Path(file.getAbsolutePath())));
LocalResource resource =
recordFactory.newRecordInstance(LocalResource.class);
resource.setResource(resourceURL);
resource.setSize(-1);
resource.setVisibility(LocalResourceVisibility.APPLICATION);
resource.setType(LocalResourceType.FILE);
resource.setTimestamp(file.lastModified());
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put(symLink, resource);
return localResources;
}
// Start the container
// While the container is running, localize new resources.
// Verify the symlink is created properly
@Test
public void testLocalingResourceWhileContainerRunning() throws Exception {
// Real del service
delSrvc = new DeletionService(exec);
delSrvc.init(conf);
((NodeManager.NMContext)context).setContainerExecutor(exec);
containerManager = createContainerManager(delSrvc);
containerManager.init(conf);
containerManager.start();
// set up local resources
Map<String, LocalResource> localResource =
setupLocalResources("file", "symLink1");
ContainerLaunchContext context =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
context.setLocalResources(localResource);
// a long running container - sleep
context.setCommands(Arrays.asList("sleep 6"));
ContainerId cId = createContainerId(0);
// start the container
StartContainerRequest scRequest = StartContainerRequest.newInstance(context,
createContainerToken(cId, DUMMY_RM_IDENTIFIER, this.context.getNodeId(),
user, this.context.getContainerTokenSecretManager()));
StartContainersRequest allRequests =
StartContainersRequest.newInstance(Arrays.asList(scRequest));
containerManager.startContainers(allRequests);
BaseContainerManagerTest
.waitForContainerState(containerManager, cId, ContainerState.RUNNING);
BaseContainerManagerTest.waitForApplicationState(containerManager,
cId.getApplicationAttemptId().getApplicationId(),
ApplicationState.RUNNING);
checkResourceLocalized(cId, "symLink1");
// Localize new local resources while container is running
Map<String, LocalResource> localResource2 =
setupLocalResources("file2", "symLink2");
ResourceLocalizationRequest request =
ResourceLocalizationRequest.newInstance(cId, localResource2);
containerManager.localize(request);
// Verify resource is localized and symlink is created.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
try {
checkResourceLocalized(cId, "symLink2");
return true;
} catch (Throwable e) {
return false;
}
}
}, 500, 20000);
BaseContainerManagerTest
.waitForContainerState(containerManager, cId, ContainerState.COMPLETE);
// Verify container cannot localize resources while at non-running state.
try{
containerManager.localize(request);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().contains("Not able to localize new resources"));
}
}
private void checkResourceLocalized(ContainerId containerId, String symLink) {
String appId =
containerId.getApplicationAttemptId().getApplicationId().toString();
File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE);
File userDir = new File(userCacheDir, user);
File appCache = new File(userDir, ContainerLocalizer.APPCACHE);
// localDir/usercache/nobody/appcache/application_0_0000
File appDir = new File(appCache, appId);
// localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000
File containerDir = new File(appDir, containerId.toString());
// localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000/symLink1
File targetFile = new File(containerDir, symLink);
File sysDir =
new File(localDir, ResourceLocalizationService.NM_PRIVATE_DIR);
// localDir/nmPrivate/application_0_0000
File appSysDir = new File(sysDir, appId);
// localDir/nmPrivate/application_0_0000/container_0_0000_01_000000
File containerSysDir = new File(appSysDir, containerId.toString());
Assert.assertTrue("AppDir " + appDir.getAbsolutePath() + " doesn't exist!!",
appDir.exists());
Assert.assertTrue(
"AppSysDir " + appSysDir.getAbsolutePath() + " doesn't exist!!",
appSysDir.exists());
Assert.assertTrue(
"containerDir " + containerDir.getAbsolutePath() + " doesn't exist !",
containerDir.exists());
Assert.assertTrue("containerSysDir " + containerSysDir.getAbsolutePath()
+ " doesn't exist !", containerDir.exists());
Assert.assertTrue(
"targetFile " + targetFile.getAbsolutePath() + " doesn't exist !!",
targetFile.exists());
}
@Test
public void testLocalFilesCleanup() throws InterruptedException,
IOException, YarnException {

View File

@ -66,6 +66,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.junit.Assert;
@ -1767,7 +1768,7 @@ public void testParallelDownloadAttemptsForPrivateResource() throws Exception {
// creating new containers and populating corresponding localizer runners
// Container - 1
ContainerImpl container1 = createMockContainer(user, 1);
Container container1 = createMockContainer(user, 1);
String localizerId1 = container1.getContainerId().toString();
rls.getPrivateLocalizers().put(
localizerId1,
@ -2292,7 +2293,7 @@ private boolean waitForResourceState(LocalizedResource lr,
}
private ContainerLocalizationRequestEvent createContainerLocalizationEvent(
ContainerImpl container, LocalResourceVisibility vis,
Container container, LocalResourceVisibility vis,
LocalResourceRequest req) {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> reqs =
new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
@ -2310,6 +2311,7 @@ private ContainerImpl createMockContainer(String user, int containerId) {
when(container.getUser()).thenReturn(user);
Credentials mockCredentials = mock(Credentials.class);
when(container.getCredentials()).thenReturn(mockCredentials);
when(container.getContainerState()).thenReturn(ContainerState.LOCALIZING);
return container;
}
@ -2358,6 +2360,7 @@ private static Container getMockContainer(ApplicationId appId, int id,
creds.addToken(new Text("tok" + id), tk);
when(c.getCredentials()).thenReturn(creds);
when(c.toString()).thenReturn(cId.toString());
when(c.getContainerState()).thenReturn(ContainerState.LOCALIZING);
return c;
}

View File

@ -89,6 +89,13 @@ public boolean signalContainer(ContainerSignalContext ctx)
public void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException {
}
@Override
public void symLink(String target, String symlink)
throws IOException {
}
@Override
public String getProcessId(ContainerId containerId) {
return String.valueOf(containerId.getContainerId());

View File

@ -1,3 +1,4 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -18,11 +19,6 @@
package org.apache.hadoop.yarn.server.nodemanager.webapp;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
@ -41,8 +37,14 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MockContainer implements Container {
private ContainerId id;
@ -118,6 +120,11 @@ public String toString() {
return "";
}
@Override
public ResourceSet getResourceSet() {
return null;
}
@Override
public void handle(ContainerEvent event) {
}