YARN-14. Symlinks to peer distributed cache files no longer work (Jason Lowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1371390 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-08-09 19:15:56 +00:00
parent 201d0ecb78
commit 82910ecaa3
7 changed files with 76 additions and 41 deletions

View File

@ -34,3 +34,16 @@ Release 2.1.0-alpha - Unreleased
YARN-12. Fix findbugs warnings in FairScheduler. (Junping Du via acmurthy) YARN-12. Fix findbugs warnings in FairScheduler. (Junping Du via acmurthy)
Release 0.23.3 - Unreleased
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
BUG FIXES
YARN-14. Symlinks to peer distributed cache files no longer work
(Jason Lowe via bobby)

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -38,7 +39,7 @@ public interface Container extends EventHandler<ContainerEvent> {
Credentials getCredentials(); Credentials getCredentials();
Map<Path,String> getLocalizedResources(); Map<Path,List<String>> getLocalizedResources();
ContainerStatus cloneAndGetContainerStatus(); ContainerStatus cloneAndGetContainerStatus();

View File

@ -84,10 +84,10 @@ public class ContainerImpl implements Container {
private static final Log LOG = LogFactory.getLog(Container.class); private static final Log LOG = LogFactory.getLog(Container.class);
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private final Map<LocalResourceRequest,String> pendingResources = private final Map<LocalResourceRequest,List<String>> pendingResources =
new HashMap<LocalResourceRequest,String>(); new HashMap<LocalResourceRequest,List<String>>();
private final Map<Path,String> localizedResources = private final Map<Path,List<String>> localizedResources =
new HashMap<Path,String>(); new HashMap<Path,List<String>>();
private final List<LocalResourceRequest> publicRsrcs = private final List<LocalResourceRequest> publicRsrcs =
new ArrayList<LocalResourceRequest>(); new ArrayList<LocalResourceRequest>();
private final List<LocalResourceRequest> privateRsrcs = private final List<LocalResourceRequest> privateRsrcs =
@ -327,7 +327,7 @@ public String getUser() {
} }
@Override @Override
public Map<Path,String> getLocalizedResources() { public Map<Path,List<String>> getLocalizedResources() {
this.readLock.lock(); this.readLock.lock();
try { try {
assert ContainerState.LOCALIZED == getContainerState(); // TODO: FIXME!! assert ContainerState.LOCALIZED == getContainerState(); // TODO: FIXME!!
@ -496,20 +496,25 @@ public ContainerState transition(ContainerImpl container,
try { try {
for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) { for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
try { try {
LocalResourceRequest req = LocalResourceRequest req =
new LocalResourceRequest(rsrc.getValue()); new LocalResourceRequest(rsrc.getValue());
container.pendingResources.put(req, rsrc.getKey()); List<String> links = container.pendingResources.get(req);
switch (rsrc.getValue().getVisibility()) { if (links == null) {
case PUBLIC: links = new ArrayList<String>();
container.publicRsrcs.add(req); container.pendingResources.put(req, links);
break; }
case PRIVATE: links.add(rsrc.getKey());
container.privateRsrcs.add(req); switch (rsrc.getValue().getVisibility()) {
break; case PUBLIC:
case APPLICATION: container.publicRsrcs.add(req);
container.appRsrcs.add(req); break;
break; case PRIVATE:
} container.privateRsrcs.add(req);
break;
case APPLICATION:
container.appRsrcs.add(req);
break;
}
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
LOG.info("Got exception parsing " + rsrc.getKey() LOG.info("Got exception parsing " + rsrc.getKey()
+ " and value " + rsrc.getValue()); + " and value " + rsrc.getValue());
@ -560,15 +565,16 @@ static class LocalizedTransition implements
public ContainerState transition(ContainerImpl container, public ContainerState transition(ContainerImpl container,
ContainerEvent event) { ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event; ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
String sym = container.pendingResources.remove(rsrcEvent.getResource()); List<String> syms =
if (null == sym) { container.pendingResources.remove(rsrcEvent.getResource());
if (null == syms) {
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() + LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
" for container " + container.getContainerID()); " for container " + container.getContainerID());
assert false; assert false;
// fail container? // fail container?
return ContainerState.LOCALIZING; return ContainerState.LOCALIZING;
} }
container.localizedResources.put(rsrcEvent.getLocation(), sym); container.localizedResources.put(rsrcEvent.getLocation(), syms);
if (!container.pendingResources.isEmpty()) { if (!container.pendingResources.isEmpty()) {
return ContainerState.LOCALIZING; return ContainerState.LOCALIZING;
} }
@ -728,15 +734,16 @@ static class LocalizedResourceDuringKillTransition implements
@Override @Override
public void transition(ContainerImpl container, ContainerEvent event) { public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event; ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
String sym = container.pendingResources.remove(rsrcEvent.getResource()); List<String> syms =
if (null == sym) { container.pendingResources.remove(rsrcEvent.getResource());
if (null == syms) {
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() + LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
" for container " + container.getContainerID()); " for container " + container.getContainerID());
assert false; assert false;
// fail container? // fail container?
return; return;
} }
container.localizedResources.put(rsrcEvent.getLocation(), sym); container.localizedResources.put(rsrcEvent.getLocation(), syms);
} }
} }

View File

@ -111,7 +111,8 @@ public ContainerLaunch(Configuration configuration, Dispatcher dispatcher,
@SuppressWarnings("unchecked") // dispatcher not typed @SuppressWarnings("unchecked") // dispatcher not typed
public Integer call() { public Integer call() {
final ContainerLaunchContext launchContext = container.getLaunchContext(); final ContainerLaunchContext launchContext = container.getLaunchContext();
final Map<Path,String> localResources = container.getLocalizedResources(); final Map<Path,List<String>> localResources =
container.getLocalizedResources();
ContainerId containerID = container.getContainerID(); ContainerId containerID = container.getContainerID();
String containerIdStr = ConverterUtils.toString(containerID); String containerIdStr = ConverterUtils.toString(containerID);
final String user = launchContext.getUser(); final String user = launchContext.getUser();
@ -533,7 +534,7 @@ public void sanitizeEnv(Map<String, String> environment,
} }
static void writeLaunchEnv(OutputStream out, static void writeLaunchEnv(OutputStream out,
Map<String,String> environment, Map<Path,String> resources, Map<String,String> environment, Map<Path,List<String>> resources,
List<String> command) List<String> command)
throws IOException { throws IOException {
ShellScriptBuilder sb = new ShellScriptBuilder(); ShellScriptBuilder sb = new ShellScriptBuilder();
@ -543,8 +544,10 @@ static void writeLaunchEnv(OutputStream out,
} }
} }
if (resources != null) { if (resources != null) {
for (Map.Entry<Path,String> link : resources.entrySet()) { for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
sb.symlink(link.getKey(), link.getValue()); for (String linkName : entry.getValue()) {
sb.symlink(entry.getKey(), linkName);
}
} }
} }

View File

@ -29,12 +29,15 @@
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.AbstractMap.SimpleEntry; import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
@ -111,11 +114,12 @@ public void testLocalizationLaunch() throws Exception {
wc = new WrappedContainer(8, 314159265358979L, 4344, "yak"); wc = new WrappedContainer(8, 314159265358979L, 4344, "yak");
assertEquals(ContainerState.NEW, wc.c.getContainerState()); assertEquals(ContainerState.NEW, wc.c.getContainerState());
wc.initContainer(); wc.initContainer();
Map<Path, String> localPaths = wc.localizeResources(); Map<Path, List<String>> localPaths = wc.localizeResources();
// all resources should be localized // all resources should be localized
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState()); assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
for (Entry<Path,String> loc : wc.c.getLocalizedResources().entrySet()) { for (Entry<Path, List<String>> loc : wc.c.getLocalizedResources()
.entrySet()) {
assertEquals(localPaths.remove(loc.getKey()), loc.getValue()); assertEquals(localPaths.remove(loc.getKey()), loc.getValue());
} }
assertTrue(localPaths.isEmpty()); assertTrue(localPaths.isEmpty());
@ -578,10 +582,12 @@ public void initContainer() {
// Localize resources // Localize resources
// Skip some resources so as to consider them failed // Skip some resources so as to consider them failed
public Map<Path, String> doLocalizeResources(boolean checkLocalizingState, public Map<Path, List<String>> doLocalizeResources(
int skipRsrcCount) throws URISyntaxException { boolean checkLocalizingState, int skipRsrcCount)
throws URISyntaxException {
Path cache = new Path("file:///cache"); Path cache = new Path("file:///cache");
Map<Path, String> localPaths = new HashMap<Path, String>(); Map<Path, List<String>> localPaths =
new HashMap<Path, List<String>>();
int counter = 0; int counter = 0;
for (Entry<String, LocalResource> rsrc : localResources.entrySet()) { for (Entry<String, LocalResource> rsrc : localResources.entrySet()) {
if (counter++ < skipRsrcCount) { if (counter++ < skipRsrcCount) {
@ -592,7 +598,7 @@ public Map<Path, String> doLocalizeResources(boolean checkLocalizingState,
} }
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue()); LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
Path p = new Path(cache, rsrc.getKey()); Path p = new Path(cache, rsrc.getKey());
localPaths.put(p, rsrc.getKey()); localPaths.put(p, Arrays.asList(rsrc.getKey()));
// rsrc copied to p // rsrc copied to p
c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(), c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(),
req, p)); req, p));
@ -602,7 +608,8 @@ public Map<Path, String> doLocalizeResources(boolean checkLocalizingState,
} }
public Map<Path, String> localizeResources() throws URISyntaxException { public Map<Path, List<String>> localizeResources()
throws URISyntaxException {
return doLocalizeResources(true, 0); return doLocalizeResources(true, 0);
} }

View File

@ -28,6 +28,7 @@
import java.io.PrintWriter; import java.io.PrintWriter;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -95,9 +96,10 @@ public void testSpecialCharSymlinks() throws IOException {
writer.println(timeoutCommand); writer.println(timeoutCommand);
writer.close(); writer.close();
Map<Path, String> resources = new HashMap<Path, String>(); Map<Path, List<String>> resources =
new HashMap<Path, List<String>>();
Path path = new Path(shellFile.getAbsolutePath()); Path path = new Path(shellFile.getAbsolutePath());
resources.put(path, badSymlink); resources.put(path, Arrays.asList(badSymlink));
FileOutputStream fos = new FileOutputStream(tempFile); FileOutputStream fos = new FileOutputStream(tempFile);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.webapp; package org.apache.hadoop.yarn.server.nodemanager.webapp;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -43,7 +44,8 @@ public class MockContainer implements Container {
private ContainerState state; private ContainerState state;
private String user; private String user;
private ContainerLaunchContext launchContext; private ContainerLaunchContext launchContext;
private final Map<Path, String> resource = new HashMap<Path, String>(); private final Map<Path, List<String>> resource =
new HashMap<Path, List<String>>();
private RecordFactory recordFactory; private RecordFactory recordFactory;
public MockContainer(ApplicationAttemptId appAttemptId, public MockContainer(ApplicationAttemptId appAttemptId,
@ -92,7 +94,7 @@ public Credentials getCredentials() {
} }
@Override @Override
public Map<Path, String> getLocalizedResources() { public Map<Path, List<String>> getLocalizedResources() {
return resource; return resource;
} }