YARN-5686. DefaultContainerExecutor random working dir algorigthm skews results (Vrushali C via Varun Saxena)
This commit is contained in:
parent
6cc7c43866
commit
7b4e9ec3b0
@ -730,6 +730,18 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|||||||
// make probability to pick a directory proportional to
|
// make probability to pick a directory proportional to
|
||||||
// the available space on the directory.
|
// the available space on the directory.
|
||||||
long randomPosition = RandomUtils.nextLong() % totalAvailable;
|
long randomPosition = RandomUtils.nextLong() % totalAvailable;
|
||||||
|
int dir = pickDirectory(randomPosition, availableOnDisk);
|
||||||
|
|
||||||
|
return getApplicationDir(new Path(localDirs.get(dir)), user, appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Picks a directory based on the input random number and
|
||||||
|
* available size at each dir.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
int pickDirectory(long randomPosition, final long[] availableOnDisk) {
|
||||||
int dir = 0;
|
int dir = 0;
|
||||||
// skip zero available space directory,
|
// skip zero available space directory,
|
||||||
// because totalAvailable is greater than 0 and randomPosition
|
// because totalAvailable is greater than 0 and randomPosition
|
||||||
@ -738,11 +750,10 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|||||||
while (availableOnDisk[dir] == 0L) {
|
while (availableOnDisk[dir] == 0L) {
|
||||||
dir++;
|
dir++;
|
||||||
}
|
}
|
||||||
while (randomPosition > availableOnDisk[dir]) {
|
while (randomPosition >= availableOnDisk[dir]) {
|
||||||
randomPosition -= availableOnDisk[dir++];
|
randomPosition -= availableOnDisk[dir++];
|
||||||
}
|
}
|
||||||
|
return dir;
|
||||||
return getApplicationDir(new Path(localDirs.get(dir)), user, appId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
|||||||
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyBoolean;
|
import static org.mockito.Matchers.anyBoolean;
|
||||||
@ -485,6 +486,38 @@ public class TestDefaultContainerExecutor {
|
|||||||
verify(mockLfs, times(2)).getFsStatus(any(Path.class));
|
verify(mockLfs, times(2)).getFsStatus(any(Path.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPickDirectory() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
FileContext lfs = FileContext.getLocalFSFileContext(conf);
|
||||||
|
DefaultContainerExecutor executor = new DefaultContainerExecutor(lfs);
|
||||||
|
|
||||||
|
long[] availableOnDisk = new long[2];
|
||||||
|
availableOnDisk[0] = 100;
|
||||||
|
availableOnDisk[1] = 100;
|
||||||
|
assertEquals(0, executor.pickDirectory(0L, availableOnDisk));
|
||||||
|
assertEquals(0, executor.pickDirectory(99L, availableOnDisk));
|
||||||
|
assertEquals(1, executor.pickDirectory(100L, availableOnDisk));
|
||||||
|
assertEquals(1, executor.pickDirectory(101L, availableOnDisk));
|
||||||
|
assertEquals(1, executor.pickDirectory(199L, availableOnDisk));
|
||||||
|
|
||||||
|
long[] availableOnDisk2 = new long[5];
|
||||||
|
availableOnDisk2[0] = 100;
|
||||||
|
availableOnDisk2[1] = 10;
|
||||||
|
availableOnDisk2[2] = 400;
|
||||||
|
availableOnDisk2[3] = 200;
|
||||||
|
availableOnDisk2[4] = 350;
|
||||||
|
assertEquals(0, executor.pickDirectory(0L, availableOnDisk2));
|
||||||
|
assertEquals(0, executor.pickDirectory(99L, availableOnDisk2));
|
||||||
|
assertEquals(1, executor.pickDirectory(100L, availableOnDisk2));
|
||||||
|
assertEquals(1, executor.pickDirectory(105L, availableOnDisk2));
|
||||||
|
assertEquals(2, executor.pickDirectory(110L, availableOnDisk2));
|
||||||
|
assertEquals(2, executor.pickDirectory(259L, availableOnDisk2));
|
||||||
|
assertEquals(3, executor.pickDirectory(700L, availableOnDisk2));
|
||||||
|
assertEquals(4, executor.pickDirectory(710L, availableOnDisk2));
|
||||||
|
assertEquals(4, executor.pickDirectory(910L, availableOnDisk2));
|
||||||
|
}
|
||||||
|
|
||||||
// @Test
|
// @Test
|
||||||
// public void testInit() throws IOException, InterruptedException {
|
// public void testInit() throws IOException, InterruptedException {
|
||||||
// Configuration conf = new Configuration();
|
// Configuration conf = new Configuration();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user