diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 47466b872c..732d3b7871 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -783,6 +783,9 @@ Release 2.1.0-beta - 2013-08-06 YARN-945. Removed setting of AMRMToken's service from ResourceManager and changed client libraries do it all the time and correctly. (vinodkv) + YARN-573. Shared data structures in Public Localizer and Private Localizer + are not Thread safe. (Omkar Vinit Joshi via jlowe) + BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS YARN-158. Yarn creating package-info.java must not depend on sh. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 1fdb082170..70debe0517 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -630,23 +631,16 @@ class PublicLocalizer extends Thread { final Configuration conf; final ExecutorService threadPool; final CompletionService queue; + // Its shared between public localizer and dispatcher thread. final Map,LocalizerResourceRequestEvent> pending; PublicLocalizer(Configuration conf) { - this(conf, getLocalFileContext(conf), - createLocalizerExecutor(conf), - new HashMap,LocalizerResourceRequestEvent>()); - } - - PublicLocalizer(Configuration conf, FileContext lfs, - ExecutorService threadPool, - Map,LocalizerResourceRequestEvent> pending) { super("Public Localizer"); - this.lfs = lfs; + this.lfs = getLocalFileContext(conf); this.conf = conf; - this.pending = pending; - - this.threadPool = threadPool; + this.pending = + new ConcurrentHashMap, LocalizerResourceRequestEvent>(); + this.threadPool = createLocalizerExecutor(conf); this.queue = new ExecutorCompletionService(threadPool); } @@ -748,6 +742,7 @@ class LocalizerRunner extends Thread { final LocalizerContext context; final String localizerId; final Map scheduled; + // Its a shared list between Private Localizer and dispatcher thread. final List pending; // TODO: threadsafe, use outer? @@ -758,13 +753,14 @@ class LocalizerRunner extends Thread { super("LocalizerRunner for " + localizerId); this.context = context; this.localizerId = localizerId; - this.pending = new ArrayList(); + this.pending = + Collections + .synchronizedList(new ArrayList()); this.scheduled = new HashMap(); } public void addResource(LocalizerResourceRequestEvent request) { - // TDOO: Synchronization pending.add(request); } @@ -774,43 +770,44 @@ public void addResource(LocalizerResourceRequestEvent request) { * @return */ private LocalResource findNextResource() { - // TODO: Synchronization - for (Iterator i = pending.iterator(); - i.hasNext();) { - LocalizerResourceRequestEvent evt = i.next(); - LocalizedResource nRsrc = evt.getResource(); - // Resource download should take place ONLY if resource is in - // Downloading state - if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) { - i.remove(); - continue; - } - /* - * Multiple containers will try to download the same resource. So the - * resource download should start only if - * 1) We can acquire a non blocking semaphore lock on resource - * 2) Resource is still in DOWNLOADING state - */ - if (nRsrc.tryAcquire()) { - if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) { - LocalResourceRequest nextRsrc = nRsrc.getRequest(); - LocalResource next = - recordFactory.newRecordInstance(LocalResource.class); - next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc - .getPath())); - next.setTimestamp(nextRsrc.getTimestamp()); - next.setType(nextRsrc.getType()); - next.setVisibility(evt.getVisibility()); - next.setPattern(evt.getPattern()); - scheduled.put(nextRsrc, evt); - return next; - } else { - // Need to release acquired lock - nRsrc.unlock(); - } - } + synchronized (pending) { + for (Iterator i = pending.iterator(); + i.hasNext();) { + LocalizerResourceRequestEvent evt = i.next(); + LocalizedResource nRsrc = evt.getResource(); + // Resource download should take place ONLY if resource is in + // Downloading state + if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) { + i.remove(); + continue; + } + /* + * Multiple containers will try to download the same resource. So the + * resource download should start only if + * 1) We can acquire a non blocking semaphore lock on resource + * 2) Resource is still in DOWNLOADING state + */ + if (nRsrc.tryAcquire()) { + if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) { + LocalResourceRequest nextRsrc = nRsrc.getRequest(); + LocalResource next = + recordFactory.newRecordInstance(LocalResource.class); + next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc + .getPath())); + next.setTimestamp(nextRsrc.getTimestamp()); + next.setType(nextRsrc.getType()); + next.setVisibility(evt.getVisibility()); + next.setPattern(evt.getPattern()); + scheduled.put(nextRsrc, evt); + return next; + } else { + // Need to release acquired lock + nRsrc.unlock(); + } + } + } + return null; } - return null; } LocalizerHeartbeatResponse update(