MAPREDUCE-2691. Finish up the cleanup of distributed cache file resources and related tests. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1167676 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-09-11 06:21:39 +00:00
parent 61d0b7530c
commit 8fb67650b1
14 changed files with 799 additions and 318 deletions

View File

@ -1269,6 +1269,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2971. ant build mapreduce fails protected access jc.displayJobList
(jobs) (Thomas Graves via mahadev)
MAPREDUCE-2691. Finishing up the cleanup of distributed cache file resources
and related tests. (Siddharth Seth via vinodkv)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -711,7 +711,7 @@ private void parseDistributedCacheArtifacts(
String linkName = name.toUri().getPath();
container.setLocalResource(
linkName,
BuilderUtils.newLocalResource(recordFactory,
BuilderUtils.newLocalResource(
p.toUri(), type,
visibilities[i]
? LocalResourceVisibility.PUBLIC

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -86,12 +87,11 @@ public int compare(org.apache.hadoop.yarn.api.records.ResourceRequest r1,
}
}
public static LocalResource newLocalResource(RecordFactory recordFactory,
URI uri, LocalResourceType type, LocalResourceVisibility visibility,
long size, long timestamp) {
public static LocalResource newLocalResource(URL url, LocalResourceType type,
LocalResourceVisibility visibility, long size, long timestamp) {
LocalResource resource =
recordFactory.newRecordInstance(LocalResource.class);
resource.setResource(ConverterUtils.getYarnUrlFromURI(uri));
recordFactory.newRecordInstance(LocalResource.class);
resource.setResource(url);
resource.setType(type);
resource.setVisibility(visibility);
resource.setSize(size);
@ -99,6 +99,13 @@ public static LocalResource newLocalResource(RecordFactory recordFactory,
return resource;
}
public static LocalResource newLocalResource(URI uri,
LocalResourceType type, LocalResourceVisibility visibility, long size,
long timestamp) {
return newLocalResource(ConverterUtils.getYarnUrlFromURI(uri), type,
visibility, size, timestamp);
}
public static ApplicationId newApplicationId(RecordFactory recordFactory,
long clustertimestamp, CharSequence id) {
ApplicationId applicationId =
@ -125,6 +132,15 @@ public static ApplicationId newApplicationId(long clusterTimeStamp, int id) {
return applicationId;
}
public static ApplicationAttemptId newApplicationAttemptId(
ApplicationId appId, int attemptId) {
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(attemptId);
return appAttemptId;
}
public static ApplicationId convert(long clustertimestamp, CharSequence id) {
ApplicationId applicationId =
recordFactory.newRecordInstance(ApplicationId.class);
@ -133,6 +149,24 @@ public static ApplicationId convert(long clustertimestamp, CharSequence id) {
return applicationId;
}
public static ContainerId newContainerId(ApplicationAttemptId appAttemptId,
int containerId) {
ContainerId id = recordFactory.newRecordInstance(ContainerId.class);
id.setAppId(appAttemptId.getApplicationId());
id.setId(containerId);
id.setAppAttemptId(appAttemptId);
return id;
}
public static ContainerId newContainerId(int appId, int appAttemptId,
long timestamp, int id) {
ApplicationId applicationId = newApplicationId(timestamp, appId);
ApplicationAttemptId applicationAttemptId = newApplicationAttemptId(
applicationId, appAttemptId);
ContainerId cId = newContainerId(applicationAttemptId, id);
return cId;
}
public static ContainerId newContainerId(RecordFactory recordFactory,
ApplicationId appId, ApplicationAttemptId appAttemptId,
int containerId) {
@ -227,4 +261,20 @@ public static ApplicationReport newApplicationReport(
report.setStartTime(startTime);
return report;
}
public static Resource newResource(int memory) {
Resource resource = recordFactory.newRecordInstance(Resource.class);
resource.setMemory(memory);
return resource;
}
public static URL newURL(String scheme, String host, int port, String file) {
URL url = recordFactory.newRecordInstance(URL.class);
url.setScheme(scheme);
url.setHost(host);
url.setPort(port);
url.setFile(file);
return url;
}
}

View File

@ -21,8 +21,10 @@
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -50,6 +52,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@ -81,6 +84,12 @@ public class ContainerImpl implements Container {
new HashMap<LocalResourceRequest,String>();
private final Map<Path,String> localizedResources =
new HashMap<Path,String>();
private final List<LocalResourceRequest> publicRsrcs =
new ArrayList<LocalResourceRequest>();
private final List<LocalResourceRequest> privateRsrcs =
new ArrayList<LocalResourceRequest>();
private final List<LocalResourceRequest> appRsrcs =
new ArrayList<LocalResourceRequest>();
public ContainerImpl(Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
@ -361,7 +370,7 @@ public ContainerStatus cloneAndGetContainerStatus() {
}
}
@SuppressWarnings("fallthrough")
@SuppressWarnings({"fallthrough", "unchecked"})
private void finished() {
switch (getContainerState()) {
case EXITED_WITH_SUCCESS:
@ -404,6 +413,24 @@ private void finished() {
containerID, exitCode));
}
@SuppressWarnings("unchecked") // dispatcher not typed
public void cleanup() {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
if (!publicRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
}
if (!privateRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
}
if (!appRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
}
dispatcher.getEventHandler().handle(
new ContainerLocalizationCleanupEvent(this, rsrc));
}
static class ContainerTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@ -439,12 +466,6 @@ public ContainerState transition(ContainerImpl container,
// Send requests for public, private resources
Map<String,LocalResource> cntrRsrc = ctxt.getAllLocalResources();
if (!cntrRsrc.isEmpty()) {
ArrayList<LocalResourceRequest> publicRsrc =
new ArrayList<LocalResourceRequest>();
ArrayList<LocalResourceRequest> privateRsrc =
new ArrayList<LocalResourceRequest>();
ArrayList<LocalResourceRequest> appRsrc =
new ArrayList<LocalResourceRequest>();
try {
for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
try {
@ -453,13 +474,13 @@ public ContainerState transition(ContainerImpl container,
container.pendingResources.put(req, rsrc.getKey());
switch (rsrc.getValue().getVisibility()) {
case PUBLIC:
publicRsrc.add(req);
container.publicRsrcs.add(req);
break;
case PRIVATE:
privateRsrc.add(req);
container.privateRsrcs.add(req);
break;
case APPLICATION:
appRsrc.add(req);
container.appRsrcs.add(req);
break;
}
} catch (URISyntaxException e) {
@ -471,27 +492,25 @@ public ContainerState transition(ContainerImpl container,
} catch (URISyntaxException e) {
// malformed resource; abort container launch
LOG.warn("Failed to parse resource-request", e);
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationEvent(
LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
container.cleanup();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZATION_FAILED;
}
if (!publicRsrc.isEmpty()) {
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(
container, publicRsrc, LocalResourceVisibility.PUBLIC));
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
if (!container.publicRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
}
if (!privateRsrc.isEmpty()) {
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(
container, privateRsrc, LocalResourceVisibility.PRIVATE));
if (!container.privateRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
}
if (!appRsrc.isEmpty()) {
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(
container, appRsrc, LocalResourceVisibility.APPLICATION));
if (!container.appRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
}
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(container, req));
return ContainerState.LOCALIZING;
} else {
container.dispatcher.getEventHandler().handle(
@ -546,7 +565,6 @@ public void transition(ContainerImpl container, ContainerEvent event) {
}
}
@SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithSuccessTransition extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
@ -554,13 +572,10 @@ public void transition(ContainerImpl container, ContainerEvent event) {
// Inform the localizer to decrement reference counts and cleanup
// resources.
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationEvent(
LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
container.cleanup();
}
}
@SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithFailureTransition extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
@ -572,13 +587,10 @@ public void transition(ContainerImpl container, ContainerEvent event) {
// Inform the localizer to decrement reference counts and cleanup
// resources.
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationEvent(
LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
container.cleanup();
}
}
@SuppressWarnings("unchecked") // dispatcher not typed
static class ResourceFailedTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@ -592,30 +604,24 @@ public void transition(ContainerImpl container, ContainerEvent event) {
// Inform the localizer to decrement reference counts and cleanup
// resources.
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationEvent(
LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
container.cleanup();
container.metrics.endInitingContainer();
}
}
@SuppressWarnings("unchecked") // dispatcher not typed
static class KillDuringLocalizationTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Inform the localizer to decrement reference counts and cleanup
// resources.
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationEvent(
LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
container.cleanup();
container.metrics.endInitingContainer();
ContainerKillEvent killEvent = (ContainerKillEvent) event;
container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
}
}
@SuppressWarnings("unchecked") // dispatcher not typed
static class LocalizedResourceDuringKillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@ -647,7 +653,6 @@ public void transition(ContainerImpl container, ContainerEvent event) {
}
}
@SuppressWarnings("unchecked") // dispatcher not typed
static class ContainerKilledTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@ -657,13 +662,10 @@ public void transition(ContainerImpl container, ContainerEvent event) {
// The process/process-grp is killed. Decrement reference counts and
// cleanup resources
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationEvent(
LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
container.cleanup();
}
}
@SuppressWarnings("unchecked") // dispatcher not typed
static class ContainerDoneTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@ -697,7 +699,8 @@ public void handle(ContainerEvent event) {
newState =
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.warn("Can't handle this event at current state", e);
LOG.warn("Can't handle this event at current state: Current: ["
+ oldState + "], eventType: [" + event.getType() + "]", e);
}
if (oldState != newState) {
LOG.info("Container " + containerID + " transitioned from "

View File

@ -40,7 +40,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
@ -274,7 +273,7 @@ private LocalizerStatus createStatus() throws InterruptedException {
stat.setLocalPath(
ConverterUtils.getYarnUrlFromPath(localPath));
stat.setLocalSize(
FileUtil.getDU(new File(localPath.getParent().toString())));
FileUtil.getDU(new File(localPath.getParent().toUri())));
stat.setStatus(ResourceStatusType.FETCH_SUCCESS);
} catch (ExecutionException e) {
stat.setStatus(ResourceStatusType.FETCH_FAILURE);

View File

@ -33,6 +33,7 @@
* {@link LocalResourceVisibility}.
*
*/
class LocalResourcesTrackerImpl implements LocalResourcesTracker {
static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
@ -83,7 +84,7 @@ public boolean contains(LocalResourceRequest resource) {
@Override
public boolean remove(LocalizedResource rem, DeletionService delService) {
// current synchronization guaranteed by crude RLS event for cleanup
LocalizedResource rsrc = localrsrc.remove(rem.getRequest());
LocalizedResource rsrc = localrsrc.get(rem.getRequest());
if (null == rsrc) {
LOG.error("Attempt to remove absent resource: " + rem.getRequest() +
" from " + getUser());
@ -93,10 +94,11 @@ public boolean remove(LocalizedResource rem, DeletionService delService) {
|| ResourceState.DOWNLOADING.equals(rsrc.getState())
|| rsrc != rem) {
// internal error
LOG.error("Attempt to remove resource with non-zero refcount");
LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount");
assert false;
return false;
}
localrsrc.remove(rem.getRequest());
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), rsrc.getLocalPath());
}

View File

@ -120,7 +120,8 @@ public String toString() {
for (ContainerId c : ref) {
sb.append("(").append(c.toString()).append(")");
}
sb.append("],").append(getTimestamp()).append("}");
sb.append("],").append(getTimestamp()).append(",")
.append(getState()).append("}");
return sb.toString();
}

View File

@ -22,6 +22,7 @@
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@ -43,6 +44,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -63,7 +65,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@ -93,7 +94,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@ -101,6 +102,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
@ -198,7 +200,7 @@ public void init(Configuration conf) {
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
localizationServerAddress = NetUtils.createSocketAddr(
conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
localizerTracker = new LocalizerTracker(conf);
localizerTracker = createLocalizerTracker(conf);
dispatcher.register(LocalizerEventType.class, localizerTracker);
cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
@ -218,6 +220,10 @@ public void start() {
super.start();
}
LocalizerTracker createLocalizerTracker(Configuration conf) {
return new LocalizerTracker(conf);
}
Server createServer() {
YarnRPC rpc = YarnRPC.create(getConfig());
Configuration conf = new Configuration(getConfig()); // Clone to separate
@ -252,6 +258,9 @@ public void stop() {
public void handle(LocalizationEvent event) {
String userName;
String appIDStr;
Container c;
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs;
LocalResourcesTracker tracker;
// TODO: create log dir as $logdir/$user/$appId
switch (event.getType()) {
case INIT_APPLICATION_RESOURCES:
@ -276,28 +285,16 @@ public void handle(LocalizationEvent event) {
case INIT_CONTAINER_RESOURCES:
ContainerLocalizationRequestEvent rsrcReqs =
(ContainerLocalizationRequestEvent) event;
Container c = rsrcReqs.getContainer();
c = rsrcReqs.getContainer();
LocalizerContext ctxt = new LocalizerContext(
c.getUser(), c.getContainerID(), c.getCredentials());
final LocalResourcesTracker tracker;
LocalResourceVisibility vis = rsrcReqs.getVisibility();
switch (vis) {
default:
case PUBLIC:
tracker = publicRsrc;
break;
case PRIVATE:
tracker = privateRsrc.get(c.getUser());
break;
case APPLICATION:
tracker =
appRsrc.get(ConverterUtils.toString(c.getContainerID().getAppId()));
break;
}
// We get separate events one each for all resources of one visibility. So
// all the resources in this event are of the same visibility.
for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) {
tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
rsrcs = rsrcReqs.getRequestedResources();
for (LocalResourceVisibility vis : rsrcs.keySet()) {
tracker = getLocalResourcesTracker(vis, c.getUser(),
c.getContainerID().getAppId());
for (LocalResourceRequest req : rsrcs.get(vis)) {
tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
}
}
break;
case CACHE_CLEANUP:
@ -311,14 +308,23 @@ public void handle(LocalizationEvent event) {
}
break;
case CLEANUP_CONTAINER_RESOURCES:
Container container =
((ContainerLocalizationEvent)event).getContainer();
ContainerLocalizationCleanupEvent rsrcCleanup =
(ContainerLocalizationCleanupEvent) event;
c = rsrcCleanup.getContainer();
rsrcs = rsrcCleanup.getResources();
for (LocalResourceVisibility vis : rsrcs.keySet()) {
tracker = getLocalResourcesTracker(vis, c.getUser(),
c.getContainerID().getAppId());
for (LocalResourceRequest req : rsrcs.get(vis)) {
tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
}
}
// Delete the container directories
userName = container.getUser();
String containerIDStr = container.toString();
userName = c.getUser();
String containerIDStr = c.toString();
appIDStr =
ConverterUtils.toString(container.getContainerID().getAppId());
ConverterUtils.toString(c.getContainerID().getAppId());
for (Path localDir : localDirs) {
// Delete the user-owned container-dir
@ -336,8 +342,7 @@ public void handle(LocalizationEvent event) {
delService.delete(null, containerSysDir, new Path[] {});
}
dispatcher.getEventHandler().handle(new ContainerEvent(
container.getContainerID(),
dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
break;
case DESTROY_APPLICATION_RESOURCES:
@ -379,6 +384,19 @@ public void handle(LocalizationEvent event) {
}
}
LocalResourcesTracker getLocalResourcesTracker(
LocalResourceVisibility visibility, String user, ApplicationId appId) {
switch (visibility) {
default:
case PUBLIC:
return publicRsrc;
case PRIVATE:
return privateRsrc.get(user);
case APPLICATION:
return appRsrc.get(ConverterUtils.toString(appId));
}
}
/**
* Sub-component handling the spawning of {@link ContainerLocalizer}s
*/
@ -526,6 +544,7 @@ public void addResource(LocalizerResourceRequestEvent request) {
}
@Override
@SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
try {
// TODO shutdown, better error handling esp. DU
@ -651,6 +670,7 @@ private LocalResource findNextResource() {
}
// TODO this sucks. Fix it later
@SuppressWarnings("unchecked") // dispatcher not typed
LocalizerHeartbeatResponse update(
List<LocalResourceStatus> remoteResourceStatuses) {
LocalizerHeartbeatResponse response =
@ -795,6 +815,7 @@ public CacheCleanup(Dispatcher dispatcher) {
}
@Override
@SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
dispatcher.getEventHandler().handle(
new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP));

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import java.util.Collection;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
public class ContainerLocalizationCleanupEvent extends
ContainerLocalizationEvent {
private final Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
rsrc;
/**
* Event requesting the cleanup of the rsrc.
* @param c
* @param rsrc
*/
public ContainerLocalizationCleanupEvent(Container c,
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
super(LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, c);
this.rsrc = rsrc;
}
public
Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
getResources() {
return rsrc;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import java.util.Collection;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -26,27 +27,23 @@
public class ContainerLocalizationRequestEvent extends
ContainerLocalizationEvent {
private final LocalResourceVisibility vis;
private final Collection<LocalResourceRequest> reqs;
private final Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
rsrc;
/**
* Event requesting the localization of the reqs all with visibility vis
* Event requesting the localization of the rsrc.
* @param c
* @param reqs
* @param vis
* @param rsrc
*/
public ContainerLocalizationRequestEvent(Container c,
Collection<LocalResourceRequest> reqs, LocalResourceVisibility vis) {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c);
this.vis = vis;
this.reqs = reqs;
this.rsrc = rsrc;
}
public LocalResourceVisibility getVisibility() {
return vis;
public
Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
getRequestedResources() {
return rsrc;
}
public Collection<LocalResourceRequest> getRequestedResources() {
return reqs;
}
}
}

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import java.net.URISyntaxException;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
@ -26,8 +24,8 @@ public class ResourceReleaseEvent extends ResourceEvent {
private final ContainerId container;
public ResourceReleaseEvent(LocalResourceRequest rsrc, ContainerId container)
throws URISyntaxException {
public ResourceReleaseEvent(LocalResourceRequest rsrc,
ContainerId container) {
super(rsrc, ResourceEventType.RELEASE);
this.container = container;
}

View File

@ -20,6 +20,8 @@
import static org.junit.Assert.fail;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@ -79,14 +81,17 @@ public void handle(LocalizationEvent event) {
ContainerLocalizationRequestEvent rsrcReqs =
(ContainerLocalizationRequestEvent) event;
// simulate localization of all requested resources
for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) {
LOG.info("DEBUG: " + req + ":" +
rsrcReqs.getContainer().getContainerID());
dispatcher.getEventHandler().handle(
new ContainerResourceLocalizedEvent(
rsrcReqs.getContainer().getContainerID(), req,
new Path("file:///local" + req.getPath().toUri().getPath())));
}
for (Collection<LocalResourceRequest> rc : rsrcReqs
.getRequestedResources().values()) {
for (LocalResourceRequest req : rc) {
LOG.info("DEBUG: " + req + ":"
+ rsrcReqs.getContainer().getContainerID());
dispatcher.getEventHandler().handle(
new ContainerResourceLocalizedEvent(rsrcReqs.getContainer()
.getContainerID(), req, new Path("file:///local"
+ req.getPath().toUri().getPath())));
}
}
break;
case CLEANUP_CONTAINER_RESOURCES:
Container container =

View File

@ -17,208 +17,203 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.AbstractMap.SimpleEntry;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Map.Entry;
import java.util.AbstractMap.SimpleEntry;
import java.util.Random;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import static org.junit.Assert.*;
import org.mockito.ArgumentMatcher;
import static org.mockito.Mockito.*;
public class TestContainer {
final NodeManagerMetrics metrics = NodeManagerMetrics.create();
/**
* Verify correct container request events sent to localizer.
*/
@Test
@SuppressWarnings("unchecked") // mocked generic
public void testLocalizationRequest() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(null);
WrappedContainer wc = null;
try {
dispatcher.start();
EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class);
dispatcher.register(LocalizationEventType.class, localizerBus);
// null serviceData; no registered AuxServicesEventType handler
ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
ContainerId cId = getMockContainerId(7, 314159265358979L, 4344);
when(ctxt.getUser()).thenReturn("yak");
when(ctxt.getContainerId()).thenReturn(cId);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("testLocalizationRequest seed: " + seed);
final Map<String,LocalResource> localResources = createLocalResources(r);
when(ctxt.getAllLocalResources()).thenReturn(localResources);
final Container c = newContainer(dispatcher, ctxt);
assertEquals(ContainerState.NEW, c.getContainerState());
wc = new WrappedContainer(7, 314159265358979L, 4344, "yak");
assertEquals(ContainerState.NEW, wc.c.getContainerState());
wc.initContainer();
// Verify request for public/private resources to localizer
c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
dispatcher.await();
ContainerReqMatcher matchesPublicReq =
new ContainerReqMatcher(localResources,
EnumSet.of(LocalResourceVisibility.PUBLIC));
ContainerReqMatcher matchesPrivateReq =
new ContainerReqMatcher(localResources,
EnumSet.of(LocalResourceVisibility.PRIVATE));
ContainerReqMatcher matchesAppReq =
new ContainerReqMatcher(localResources,
EnumSet.of(LocalResourceVisibility.APPLICATION));
verify(localizerBus).handle(argThat(matchesPublicReq));
verify(localizerBus).handle(argThat(matchesPrivateReq));
verify(localizerBus).handle(argThat(matchesAppReq));
assertEquals(ContainerState.LOCALIZING, c.getContainerState());
} finally {
dispatcher.stop();
ResourcesRequestedMatcher matchesReq =
new ResourcesRequestedMatcher(wc.localResources, EnumSet.of(
LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
LocalResourceVisibility.APPLICATION));
verify(wc.localizerBus).handle(argThat(matchesReq));
assertEquals(ContainerState.LOCALIZING, wc.c.getContainerState());
}
finally {
if (wc != null) {
wc.finished();
}
}
}
/**
* Verify container launch when all resources already cached.
*/
@Test
@SuppressWarnings("unchecked") // mocked generic
public void testLocalizationLaunch() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(null);
WrappedContainer wc = null;
try {
dispatcher.start();
EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class);
dispatcher.register(LocalizationEventType.class, localizerBus);
EventHandler<ContainersLauncherEvent> launcherBus =
mock(EventHandler.class);
dispatcher.register(ContainersLauncherEventType.class, launcherBus);
// null serviceData; no registered AuxServicesEventType handler
ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
ContainerId cId = getMockContainerId(8, 314159265358979L, 4344);
when(ctxt.getUser()).thenReturn("yak");
when(ctxt.getContainerId()).thenReturn(cId);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("testLocalizationLaunch seed: " + seed);
final Map<String,LocalResource> localResources = createLocalResources(r);
when(ctxt.getAllLocalResources()).thenReturn(localResources);
final Container c = newContainer(dispatcher, ctxt);
assertEquals(ContainerState.NEW, c.getContainerState());
c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
dispatcher.await();
// Container prepared for localization events
Path cache = new Path("file:///cache");
Map<Path,String> localPaths = new HashMap<Path,String>();
for (Entry<String,LocalResource> rsrc : localResources.entrySet()) {
assertEquals(ContainerState.LOCALIZING, c.getContainerState());
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
Path p = new Path(cache, rsrc.getKey());
localPaths.put(p, rsrc.getKey());
// rsrc copied to p
c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(), req, p));
}
dispatcher.await();
wc = new WrappedContainer(8, 314159265358979L, 4344, "yak");
assertEquals(ContainerState.NEW, wc.c.getContainerState());
wc.initContainer();
Map<Path, String> localPaths = wc.localizeResources();
// all resources should be localized
assertEquals(ContainerState.LOCALIZED, c.getContainerState());
for (Entry<Path,String> loc : c.getLocalizedResources().entrySet()) {
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
for (Entry<Path,String> loc : wc.c.getLocalizedResources().entrySet()) {
assertEquals(localPaths.remove(loc.getKey()), loc.getValue());
}
assertTrue(localPaths.isEmpty());
final WrappedContainer wcf = wc;
// verify container launch
ArgumentMatcher<ContainersLauncherEvent> matchesContainerLaunch =
new ArgumentMatcher<ContainersLauncherEvent>() {
@Override
public boolean matches(Object o) {
ContainersLauncherEvent launchEvent = (ContainersLauncherEvent) o;
return c == launchEvent.getContainer();
return wcf.c == launchEvent.getContainer();
}
};
verify(launcherBus).handle(argThat(matchesContainerLaunch));
verify(wc.launcherBus).handle(argThat(matchesContainerLaunch));
} finally {
dispatcher.stop();
if (wc != null) {
wc.finished();
}
}
}
@Test
@SuppressWarnings("unchecked") // mocked generic
public void testCleanupOnFailure() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(10, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
wc.launchContainer();
reset(wc.localizerBus);
wc.containerFailed(ExitCode.KILLED.getExitCode());
assertEquals(ContainerState.EXITED_WITH_FAILURE,
wc.c.getContainerState());
verifyCleanupCall(wc);
}
finally {
if (wc != null) {
wc.finished();
}
}
}
@Test
@SuppressWarnings("unchecked") // mocked generic
public void testCleanupOnSuccess() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(11, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
wc.launchContainer();
reset(wc.localizerBus);
wc.containerSuccessful();
assertEquals(ContainerState.EXITED_WITH_SUCCESS,
wc.c.getContainerState());
verifyCleanupCall(wc);
}
finally {
if (wc != null) {
wc.finished();
}
}
}
@Test
@SuppressWarnings("unchecked") // mocked generic
public void testCleanupOnKillRequest() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(12, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
wc.launchContainer();
reset(wc.localizerBus);
wc.killContainer();
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
wc.containerKilledOnRequest();
verifyCleanupCall(wc);
} finally {
if (wc != null) {
wc.finished();
}
}
}
/**
* Verify serviceData correctly sent.
*/
@Test
@SuppressWarnings("unchecked") // mocked generic
public void testServiceData() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(null);
dispatcher.start();
WrappedContainer wc = null;
try {
EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class);
dispatcher.register(LocalizationEventType.class, localizerBus);
EventHandler<AuxServicesEvent> auxBus = mock(EventHandler.class);
dispatcher.register(AuxServicesEventType.class, auxBus);
EventHandler<ContainersLauncherEvent> launchBus = mock(EventHandler.class);
dispatcher.register(ContainersLauncherEventType.class, launchBus);
ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
final ContainerId cId = getMockContainerId(9, 314159265358979L, 4344);
when(ctxt.getUser()).thenReturn("yak");
when(ctxt.getContainerId()).thenReturn(cId);
when(ctxt.getAllLocalResources()).thenReturn(
Collections.<String,LocalResource>emptyMap());
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("testServiceData seed: " + seed);
final Map<String,ByteBuffer> serviceData = createServiceData(r);
when(ctxt.getAllServiceData()).thenReturn(serviceData);
final Container c = newContainer(dispatcher, ctxt);
assertEquals(ContainerState.NEW, c.getContainerState());
// Verify propagation of service data to AuxServices
c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
dispatcher.await();
for (final Map.Entry<String,ByteBuffer> e : serviceData.entrySet()) {
wc = new WrappedContainer(9, 314159265358979L, 4344, "yak", false, true);
assertEquals(ContainerState.NEW, wc.c.getContainerState());
wc.initContainer();
for (final Map.Entry<String,ByteBuffer> e : wc.serviceData.entrySet()) {
ArgumentMatcher<AuxServicesEvent> matchesServiceReq =
new ArgumentMatcher<AuxServicesEvent>() {
@Override
@ -228,9 +223,10 @@ public boolean matches(Object o) {
&& 0 == e.getValue().compareTo(evt.getServiceData());
}
};
verify(auxBus).handle(argThat(matchesServiceReq));
verify(wc.auxBus).handle(argThat(matchesServiceReq));
}
final WrappedContainer wcf = wc;
// verify launch on empty resource request
ArgumentMatcher<ContainersLauncherEvent> matchesLaunchReq =
new ArgumentMatcher<ContainersLauncherEvent>() {
@ -238,61 +234,103 @@ public boolean matches(Object o) {
public boolean matches(Object o) {
ContainersLauncherEvent evt = (ContainersLauncherEvent) o;
return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER
&& cId == evt.getContainer().getContainerID();
&& wcf.cId == evt.getContainer().getContainerID();
}
};
verify(launchBus).handle(argThat(matchesLaunchReq));
verify(wc.launcherBus).handle(argThat(matchesLaunchReq));
} finally {
dispatcher.stop();
if (wc != null) {
wc.finished();
}
}
}
// Accept iff the resource request payload matches.
static class ContainerReqMatcher extends ArgumentMatcher<LocalizationEvent> {
private void verifyCleanupCall(WrappedContainer wc) throws Exception {
ResourcesReleasedMatcher matchesReq =
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
LocalResourceVisibility.APPLICATION));
verify(wc.localizerBus).handle(argThat(matchesReq));
}
private static class ResourcesReleasedMatcher extends
ArgumentMatcher<LocalizationEvent> {
final HashSet<LocalResourceRequest> resources =
new HashSet<LocalResourceRequest>();
ContainerReqMatcher(Map<String,LocalResource> allResources,
new HashSet<LocalResourceRequest>();
ResourcesReleasedMatcher(Map<String, LocalResource> allResources,
EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
for (Entry<String,LocalResource> e : allResources.entrySet()) {
for (Entry<String, LocalResource> e : allResources.entrySet()) {
if (vis.contains(e.getValue().getVisibility())) {
resources.add(new LocalResourceRequest(e.getValue()));
}
}
}
@Override
public boolean matches(Object o) {
ContainerLocalizationRequestEvent evt = (ContainerLocalizationRequestEvent) o;
if (!(o instanceof ContainerLocalizationCleanupEvent)) {
return false;
}
ContainerLocalizationCleanupEvent evt =
(ContainerLocalizationCleanupEvent) o;
final HashSet<LocalResourceRequest> expected =
new HashSet<LocalResourceRequest>(resources);
for (LocalResourceRequest rsrc : evt.getRequestedResources()) {
if (!expected.remove(rsrc)) {
return false;
new HashSet<LocalResourceRequest>(resources);
for (Collection<LocalResourceRequest> rc : evt.getResources().values()) {
for (LocalResourceRequest rsrc : rc) {
if (!expected.remove(rsrc)) {
return false;
}
}
}
return expected.isEmpty();
}
}
static Entry<String,LocalResource> getMockRsrc(Random r,
LocalResourceVisibility vis) {
LocalResource rsrc = mock(LocalResource.class);
// Accept iff the resource payload matches.
private static class ResourcesRequestedMatcher extends
ArgumentMatcher<LocalizationEvent> {
final HashSet<LocalResourceRequest> resources =
new HashSet<LocalResourceRequest>();
String name = Long.toHexString(r.nextLong());
URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
when(uri.getScheme()).thenReturn("file");
when(uri.getHost()).thenReturn(null);
when(uri.getFile()).thenReturn("/local/" + vis + "/" + name);
ResourcesRequestedMatcher(Map<String, LocalResource> allResources,
EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
for (Entry<String, LocalResource> e : allResources.entrySet()) {
if (vis.contains(e.getValue().getVisibility())) {
resources.add(new LocalResourceRequest(e.getValue()));
}
}
}
when(rsrc.getResource()).thenReturn(uri);
when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L);
when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
when(rsrc.getVisibility()).thenReturn(vis);
return new SimpleEntry<String,LocalResource>(name, rsrc);
@Override
public boolean matches(Object o) {
ContainerLocalizationRequestEvent evt =
(ContainerLocalizationRequestEvent) o;
final HashSet<LocalResourceRequest> expected =
new HashSet<LocalResourceRequest>(resources);
for (Collection<LocalResourceRequest> rc : evt.getRequestedResources()
.values()) {
for (LocalResourceRequest rsrc : rc) {
if (!expected.remove(rsrc)) {
return false;
}
}
}
return expected.isEmpty();
}
}
static Map<String,LocalResource> createLocalResources(Random r) {
private static Entry<String, LocalResource> getMockRsrc(Random r,
LocalResourceVisibility vis) {
String name = Long.toHexString(r.nextLong());
URL url = BuilderUtils.newURL("file", null, 0, "/local" + vis + "/" + name);
LocalResource rsrc =
BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
return new SimpleEntry<String, LocalResource>(name, rsrc);
}
private static Map<String,LocalResource> createLocalResources(Random r) {
Map<String,LocalResource> localResources =
new HashMap<String,LocalResource>();
for (int i = r.nextInt(5) + 5; i >= 0; --i) {
@ -313,17 +351,7 @@ static Map<String,LocalResource> createLocalResources(Random r) {
return localResources;
}
static ContainerId getMockContainerId(int appId, long timestamp, int id) {
ApplicationId aId = mock(ApplicationId.class);
when(aId.getId()).thenReturn(appId);
when(aId.getClusterTimestamp()).thenReturn(timestamp);
ContainerId cId = mock(ContainerId.class);
when(cId.getId()).thenReturn(id);
when(cId.getAppId()).thenReturn(aId);
return cId;
}
static Map<String,ByteBuffer> createServiceData(Random r) {
private static Map<String,ByteBuffer> createServiceData(Random r) {
Map<String,ByteBuffer> serviceData =
new HashMap<String,ByteBuffer>();
for (int i = r.nextInt(5) + 5; i >= 0; --i) {
@ -335,7 +363,134 @@ static Map<String,ByteBuffer> createServiceData(Random r) {
return serviceData;
}
Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) {
private Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) {
return new ContainerImpl(disp, ctx, null, metrics);
}
@SuppressWarnings("unchecked")
private class WrappedContainer {
final DrainDispatcher dispatcher;
final EventHandler<LocalizationEvent> localizerBus;
final EventHandler<ContainersLauncherEvent> launcherBus;
final EventHandler<ContainersMonitorEvent> monitorBus;
final EventHandler<AuxServicesEvent> auxBus;
final ContainerLaunchContext ctxt;
final ContainerId cId;
final Container c;
final Map<String, LocalResource> localResources;
final Map<String, ByteBuffer> serviceData;
final String user;
WrappedContainer(int appId, long timestamp, int id, String user) {
this(appId, timestamp, id, user, true, false);
}
WrappedContainer(int appId, long timestamp, int id, String user,
boolean withLocalRes, boolean withServiceData) {
dispatcher = new DrainDispatcher();
dispatcher.init(null);
localizerBus = mock(EventHandler.class);
launcherBus = mock(EventHandler.class);
monitorBus = mock(EventHandler.class);
auxBus = mock(EventHandler.class);
dispatcher.register(LocalizationEventType.class, localizerBus);
dispatcher.register(ContainersLauncherEventType.class, launcherBus);
dispatcher.register(ContainersMonitorEventType.class, monitorBus);
dispatcher.register(AuxServicesEventType.class, auxBus);
this.user = user;
ctxt = mock(ContainerLaunchContext.class);
cId = BuilderUtils.newContainerId(appId, 1, timestamp, id);
when(ctxt.getUser()).thenReturn(this.user);
when(ctxt.getContainerId()).thenReturn(cId);
Resource resource = BuilderUtils.newResource(1024);
when(ctxt.getResource()).thenReturn(resource);
if (withLocalRes) {
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("WrappedContainerLocalResource seed: " + seed);
localResources = createLocalResources(r);
} else {
localResources = Collections.<String, LocalResource> emptyMap();
}
when(ctxt.getAllLocalResources()).thenReturn(localResources);
if (withServiceData) {
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("ServiceData seed: " + seed);
serviceData = createServiceData(r);
} else {
serviceData = Collections.<String, ByteBuffer> emptyMap();
}
when(ctxt.getAllServiceData()).thenReturn(serviceData);
c = newContainer(dispatcher, ctxt);
dispatcher.start();
}
private void drainDispatcherEvents() {
dispatcher.await();
}
public void finished() {
dispatcher.stop();
}
public void initContainer() {
c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
drainDispatcherEvents();
}
public Map<Path, String> localizeResources() throws URISyntaxException {
Path cache = new Path("file:///cache");
Map<Path, String> localPaths = new HashMap<Path, String>();
for (Entry<String, LocalResource> rsrc : localResources.entrySet()) {
assertEquals(ContainerState.LOCALIZING, c.getContainerState());
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
Path p = new Path(cache, rsrc.getKey());
localPaths.put(p, rsrc.getKey());
// rsrc copied to p
c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(),
req, p));
}
drainDispatcherEvents();
return localPaths;
}
public void launchContainer() {
c.handle(new ContainerEvent(cId, ContainerEventType.CONTAINER_LAUNCHED));
drainDispatcherEvents();
}
public void containerSuccessful() {
c.handle(new ContainerEvent(cId,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
drainDispatcherEvents();
}
public void containerFailed(int exitCode) {
c.handle(new ContainerExitEvent(cId,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, exitCode));
drainDispatcherEvents();
}
public void killContainer() {
c.handle(new ContainerKillEvent(cId, "KillRequest"));
drainDispatcherEvents();
}
public void containerKilledOnRequest() {
c.handle(new ContainerExitEvent(cId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.KILLED
.getExitCode()));
drainDispatcherEvents();
}
}
}

View File

@ -21,10 +21,17 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import junit.framework.Assert;
import org.apache.avro.ipc.Server;
import org.apache.hadoop.conf.Configuration;
@ -63,11 +70,15 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
import static org.junit.Assert.*;
@ -132,6 +143,190 @@ public void testLocalizationInit() throws Exception {
}
}
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testResourceRelease() throws Exception {
Configuration conf = new Configuration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
doNothing().when(spylfs).mkdir(
isA(Path.class), isA(FsPermission.class), anyBoolean());
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
Server ignore = mock(Server.class);
LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
dispatcher.start();
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher.register(ContainerEventType.class, containerBus);
//Ignore actual localization
EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = new DeletionService(exec);
delService.init(null);
delService.start();
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService);
ResourceLocalizationService spyService = spy(rawService);
doReturn(ignore).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
isA(Configuration.class));
doReturn(lfs).when(spyService)
.getLocalFileContext(isA(Configuration.class));
try {
spyService.init(conf);
spyService.start();
final String user = "user0";
// init application
final Application app = mock(Application.class);
final ApplicationId appId =
BuilderUtils.newApplicationId(314159265358979L, 3);
when(app.getUser()).thenReturn(user);
when(app.getAppId()).thenReturn(appId);
spyService.handle(new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
dispatcher.await();
//Get a handle on the trackers after they're setup with INIT_APP_RESOURCES
LocalResourcesTracker appTracker =
spyService.getLocalResourcesTracker(
LocalResourceVisibility.APPLICATION, user, appId);
LocalResourcesTracker privTracker =
spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
user, appId);
LocalResourcesTracker pubTracker =
spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
user, appId);
// init container.
final Container c = getMockContainer(appId, 42);
// init resources
Random r = new Random();
long seed = r.nextLong();
System.out.println("SEED: " + seed);
r.setSeed(seed);
// Send localization requests for one resource of each type.
final LocalResource privResource = getPrivateMockedResource(r);
final LocalResourceRequest privReq =
new LocalResourceRequest(privResource);
final LocalResource pubResource = getPublicMockedResource(r);
final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
final LocalResource pubResource2 = getPublicMockedResource(r);
final LocalResourceRequest pubReq2 =
new LocalResourceRequest(pubResource2);
final LocalResource appResource = getAppMockedResource(r);
final LocalResourceRequest appReq = new LocalResourceRequest(appResource);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
req.put(LocalResourceVisibility.PRIVATE,
Collections.singletonList(privReq));
req.put(LocalResourceVisibility.PUBLIC,
Collections.singletonList(pubReq));
req.put(LocalResourceVisibility.APPLICATION,
Collections.singletonList(appReq));
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
req2.put(LocalResourceVisibility.PRIVATE,
Collections.singletonList(privReq));
req2.put(LocalResourceVisibility.PUBLIC,
Collections.singletonList(pubReq2));
Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
pubRsrcs.add(pubReq);
pubRsrcs.add(pubReq2);
// Send Request event
spyService.handle(new ContainerLocalizationRequestEvent(c, req));
spyService.handle(new ContainerLocalizationRequestEvent(c, req2));
dispatcher.await();
int privRsrcCount = 0;
for (LocalizedResource lr : privTracker) {
privRsrcCount++;
Assert.assertEquals("Incorrect reference count", 2, lr.getRefCount());
Assert.assertEquals(privReq, lr.getRequest());
}
Assert.assertEquals(1, privRsrcCount);
int pubRsrcCount = 0;
for (LocalizedResource lr : pubTracker) {
pubRsrcCount++;
Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
pubRsrcs.remove(lr.getRequest());
}
Assert.assertEquals(0, pubRsrcs.size());
Assert.assertEquals(2, pubRsrcCount);
int appRsrcCount = 0;
for (LocalizedResource lr : appTracker) {
appRsrcCount++;
Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
Assert.assertEquals(appReq, lr.getRequest());
}
Assert.assertEquals(1, appRsrcCount);
//Send Cleanup Event
spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
req2.remove(LocalResourceVisibility.PRIVATE);
spyService.handle(new ContainerLocalizationCleanupEvent(c, req2));
dispatcher.await();
pubRsrcs.add(pubReq);
pubRsrcs.add(pubReq2);
privRsrcCount = 0;
for (LocalizedResource lr : privTracker) {
privRsrcCount++;
Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
Assert.assertEquals(privReq, lr.getRequest());
}
Assert.assertEquals(1, privRsrcCount);
pubRsrcCount = 0;
for (LocalizedResource lr : pubTracker) {
pubRsrcCount++;
Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
pubRsrcs.remove(lr.getRequest());
}
Assert.assertEquals(0, pubRsrcs.size());
Assert.assertEquals(2, pubRsrcCount);
appRsrcCount = 0;
for (LocalizedResource lr : appTracker) {
appRsrcCount++;
Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
Assert.assertEquals(appReq, lr.getRequest());
}
Assert.assertEquals(1, appRsrcCount);
} finally {
dispatcher.stop();
delService.stop();
}
}
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception {
@ -175,9 +370,8 @@ public void testLocalizationHeartbeat() throws Exception {
// init application
final Application app = mock(Application.class);
final ApplicationId appId = mock(ApplicationId.class);
when(appId.getClusterTimestamp()).thenReturn(314159265358979L);
when(appId.getId()).thenReturn(3);
final ApplicationId appId =
BuilderUtils.newApplicationId(314159265358979L, 3);
when(app.getUser()).thenReturn("user0");
when(app.getAppId()).thenReturn(appId);
spyService.handle(new ApplicationLocalizationEvent(
@ -205,11 +399,13 @@ public boolean matches(Object o) {
doReturn(out).when(spylfs).createInternal(isA(Path.class),
isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
anyLong(), isA(Progressable.class), anyInt(), anyBoolean());
final LocalResource resource = getMockResource(r);
final LocalResource resource = getPrivateMockedResource(r);
final LocalResourceRequest req = new LocalResourceRequest(resource);
spyService.handle(new ContainerLocalizationRequestEvent(
c, Collections.singletonList(req),
LocalResourceVisibility.PRIVATE));
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
// Sigh. Thread init of private localizer not accessible
Thread.sleep(500);
dispatcher.await();
@ -265,42 +461,44 @@ public boolean matches(Object o) {
}
}
static URL getPath(String path) {
URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
when(uri.getScheme()).thenReturn("file");
when(uri.getHost()).thenReturn(null);
when(uri.getFile()).thenReturn(path);
return uri;
private static URL getPath(String path) {
URL url = BuilderUtils.newURL("file", null, 0, path);
return url;
}
static LocalResource getMockResource(Random r) {
LocalResource rsrc = mock(LocalResource.class);
private static LocalResource getMockedResource(Random r,
LocalResourceVisibility vis) {
String name = Long.toHexString(r.nextLong());
URL uri = getPath("/local/PRIVATE/" + name);
when(rsrc.getResource()).thenReturn(uri);
when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L);
when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
when(rsrc.getVisibility()).thenReturn(LocalResourceVisibility.PRIVATE);
URL url = getPath("/local/PRIVATE/" + name);
LocalResource rsrc =
BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
return rsrc;
}
private static LocalResource getAppMockedResource(Random r) {
return getMockedResource(r, LocalResourceVisibility.APPLICATION);
}
private static LocalResource getPublicMockedResource(Random r) {
return getMockedResource(r, LocalResourceVisibility.PUBLIC);
}
private static LocalResource getPrivateMockedResource(Random r) {
return getMockedResource(r, LocalResourceVisibility.PRIVATE);
}
static Container getMockContainer(ApplicationId appId, int id) {
private static Container getMockContainer(ApplicationId appId, int id) {
Container c = mock(Container.class);
ApplicationAttemptId appAttemptId = Records.newRecord(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(1);
ContainerId cId = Records.newRecord(ContainerId.class);
cId.setAppAttemptId(appAttemptId);
cId.setAppId(appId);
cId.setId(id);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
when(c.getUser()).thenReturn("user0");
when(c.getContainerID()).thenReturn(cId);
Credentials creds = new Credentials();
creds.addToken(new Text("tok" + id), getToken(id));
when(c.getCredentials()).thenReturn(creds);
when(c.toString()).thenReturn(cId.toString());
return c;
}