diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 568c80b4a5..49478209da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -730,6 +730,18 @@ public class DefaultContainerExecutor extends ContainerExecutor { // make probability to pick a directory proportional to // the available space on the directory. long randomPosition = RandomUtils.nextLong() % totalAvailable; + int dir = pickDirectory(randomPosition, availableOnDisk); + + return getApplicationDir(new Path(localDirs.get(dir)), user, appId); + } + + /** + * Picks a directory based on the input random number and + * available size at each dir. + */ + @Private + @VisibleForTesting + int pickDirectory(long randomPosition, final long[] availableOnDisk) { int dir = 0; // skip zero available space directory, // because totalAvailable is greater than 0 and randomPosition @@ -738,11 +750,10 @@ public class DefaultContainerExecutor extends ContainerExecutor { while (availableOnDisk[dir] == 0L) { dir++; } - while (randomPosition > availableOnDisk[dir]) { + while (randomPosition >= availableOnDisk[dir]) { randomPosition -= availableOnDisk[dir++]; } - - return getApplicationDir(new Path(localDirs.get(dir)), user, appId); + return dir; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java index 9fb55c4660..215de91f2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; @@ -485,6 +486,38 @@ public class TestDefaultContainerExecutor { verify(mockLfs, times(2)).getFsStatus(any(Path.class)); } + @Test + public void testPickDirectory() throws Exception { + Configuration conf = new Configuration(); + FileContext lfs = FileContext.getLocalFSFileContext(conf); + DefaultContainerExecutor executor = new DefaultContainerExecutor(lfs); + + long[] availableOnDisk = new long[2]; + availableOnDisk[0] = 100; + availableOnDisk[1] = 100; + assertEquals(0, executor.pickDirectory(0L, availableOnDisk)); + assertEquals(0, executor.pickDirectory(99L, availableOnDisk)); + assertEquals(1, executor.pickDirectory(100L, availableOnDisk)); + assertEquals(1, executor.pickDirectory(101L, availableOnDisk)); + assertEquals(1, executor.pickDirectory(199L, availableOnDisk)); + + long[] availableOnDisk2 = new long[5]; + availableOnDisk2[0] = 100; + availableOnDisk2[1] = 10; + availableOnDisk2[2] = 400; + availableOnDisk2[3] = 200; + availableOnDisk2[4] = 350; + assertEquals(0, executor.pickDirectory(0L, availableOnDisk2)); + assertEquals(0, executor.pickDirectory(99L, availableOnDisk2)); + assertEquals(1, executor.pickDirectory(100L, availableOnDisk2)); + assertEquals(1, executor.pickDirectory(105L, availableOnDisk2)); + assertEquals(2, executor.pickDirectory(110L, availableOnDisk2)); + assertEquals(2, executor.pickDirectory(259L, availableOnDisk2)); + assertEquals(3, executor.pickDirectory(700L, availableOnDisk2)); + assertEquals(4, executor.pickDirectory(710L, availableOnDisk2)); + assertEquals(4, executor.pickDirectory(910L, availableOnDisk2)); + } + // @Test // public void testInit() throws IOException, InterruptedException { // Configuration conf = new Configuration();