YARN-1781. Modified NodeManagers to allow admins to specify max disk utilization for local disks so as to be able to offline full disks. Contributed by Varun Vasudev.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1575463 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0f595915a3
commit
0b1304d098
@ -271,6 +271,10 @@ Release 2.4.0 - UNRELEASED
|
||||
YARN-1525. Web UI should redirect to active RM when HA is enabled. (Cindy Li
|
||||
via kasha)
|
||||
|
||||
YARN-1781. Modified NodeManagers to allow admins to specify max disk
|
||||
utilization for local disks so as to be able to offline full disks. (Varun
|
||||
Vasudev via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -717,32 +717,59 @@ public class YarnConfiguration extends Configuration {
|
||||
/** Class that calculates process tree resource utilization.*/
|
||||
public static final String NM_CONTAINER_MON_PROCESS_TREE =
|
||||
NM_PREFIX + "container-monitor.process-tree.class";
|
||||
|
||||
|
||||
/** Prefix for all node manager disk health checker configs. */
|
||||
private static final String NM_DISK_HEALTH_CHECK_PREFIX =
|
||||
"yarn.nodemanager.disk-health-checker.";
|
||||
/**
|
||||
* Enable/Disable disks' health checker. Default is true.
|
||||
* An expert level configuration property.
|
||||
* Enable/Disable disks' health checker. Default is true. An expert level
|
||||
* configuration property.
|
||||
*/
|
||||
public static final String NM_DISK_HEALTH_CHECK_ENABLE =
|
||||
NM_PREFIX + "disk-health-checker.enable";
|
||||
/** Frequency of running disks' health checker.*/
|
||||
NM_DISK_HEALTH_CHECK_PREFIX + "enable";
|
||||
/** Frequency of running disks' health checker. */
|
||||
public static final String NM_DISK_HEALTH_CHECK_INTERVAL_MS =
|
||||
NM_PREFIX + "disk-health-checker.interval-ms";
|
||||
NM_DISK_HEALTH_CHECK_PREFIX + "interval-ms";
|
||||
/** By default, disks' health is checked every 2 minutes. */
|
||||
public static final long DEFAULT_NM_DISK_HEALTH_CHECK_INTERVAL_MS =
|
||||
2 * 60 * 1000;
|
||||
2 * 60 * 1000;
|
||||
|
||||
/**
|
||||
* The minimum fraction of number of disks to be healthy for the nodemanager
|
||||
* to launch new containers. This applies to nm-local-dirs and nm-log-dirs.
|
||||
*/
|
||||
public static final String NM_MIN_HEALTHY_DISKS_FRACTION =
|
||||
NM_PREFIX + "disk-health-checker.min-healthy-disks";
|
||||
NM_DISK_HEALTH_CHECK_PREFIX + "min-healthy-disks";
|
||||
/**
|
||||
* By default, at least 25% of disks are to be healthy to say that the node
|
||||
* is healthy in terms of disks.
|
||||
* By default, at least 25% of disks are to be healthy to say that the node is
|
||||
* healthy in terms of disks.
|
||||
*/
|
||||
public static final float DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION
|
||||
= 0.25F;
|
||||
public static final float DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION = 0.25F;
|
||||
|
||||
/**
|
||||
* The maximum percentage of disk space that can be used after which a disk is
|
||||
* marked as offline. Values can range from 0.0 to 100.0. If the value is
|
||||
* greater than or equal to 100, NM will check for full disk. This applies to
|
||||
* nm-local-dirs and nm-log-dirs.
|
||||
*/
|
||||
public static final String NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE =
|
||||
NM_DISK_HEALTH_CHECK_PREFIX + "max-disk-utilization-per-disk-percentage";
|
||||
/**
|
||||
* By default, 100% of the disk can be used before it is marked as offline.
|
||||
*/
|
||||
public static final float DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE =
|
||||
100.0F;
|
||||
|
||||
/**
|
||||
* The minimum space that must be available on a local dir for it to be used.
|
||||
* This applies to nm-local-dirs and nm-log-dirs.
|
||||
*/
|
||||
public static final String NM_MIN_PER_DISK_FREE_SPACE_MB =
|
||||
NM_DISK_HEALTH_CHECK_PREFIX + "min-free-space-per-disk-mb";
|
||||
/**
|
||||
* By default, all of the disk can be used before it is marked as offline.
|
||||
*/
|
||||
public static final long DEFAULT_NM_MIN_PER_DISK_FREE_SPACE_MB = 0;
|
||||
|
||||
/** Frequency of running node health script.*/
|
||||
public static final String NM_HEALTH_CHECK_INTERVAL_MS =
|
||||
|
@ -870,6 +870,24 @@
|
||||
<value>0.25</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The maximum percentage of disk space utilization allowed after
|
||||
which a disk is marked as bad. Values can range from 0.0 to 100.0.
|
||||
If the value is greater than or equal to 100, the nodemanager will check
|
||||
for full disk. This applies to yarn-nodemanager.local-dirs and
|
||||
yarn.nodemanager.log-dirs.</description>
|
||||
<name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
|
||||
<value>100.0</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The minimum space that must be available on a disk for
|
||||
it to be used. This applies to yarn-nodemanager.local-dirs and
|
||||
yarn.nodemanager.log-dirs.</description>
|
||||
<name>yarn.nodemanager.disk-health-checker.min-free-space-per-disk-mb</name>
|
||||
<value>0</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The path to the Linux container executor.</description>
|
||||
<name>yarn.nodemanager.linux-container-executor.path</name>
|
||||
|
@ -22,6 +22,7 @@
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
@ -43,10 +44,80 @@ class DirectoryCollection {
|
||||
private List<String> localDirs;
|
||||
private List<String> failedDirs;
|
||||
private int numFailures;
|
||||
|
||||
private float diskUtilizationPercentageCutoff;
|
||||
private long diskUtilizationSpaceCutoff;
|
||||
|
||||
/**
|
||||
* Create collection for the directories specified. No check for free space.
|
||||
*
|
||||
* @param dirs
|
||||
* directories to be monitored
|
||||
*/
|
||||
public DirectoryCollection(String[] dirs) {
|
||||
this(dirs, 100.0F, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create collection for the directories specified. Users must specify the
|
||||
* maximum percentage of disk utilization allowed. Minimum amount of disk
|
||||
* space is not checked.
|
||||
*
|
||||
* @param dirs
|
||||
* directories to be monitored
|
||||
* @param utilizationPercentageCutOff
|
||||
* percentage of disk that can be used before the dir is taken out of
|
||||
* the good dirs list
|
||||
*
|
||||
*/
|
||||
public DirectoryCollection(String[] dirs, float utilizationPercentageCutOff) {
|
||||
this(dirs, utilizationPercentageCutOff, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create collection for the directories specified. Users must specify the
|
||||
* minimum amount of free space that must be available for the dir to be used.
|
||||
*
|
||||
* @param dirs
|
||||
* directories to be monitored
|
||||
* @param utilizationSpaceCutOff
|
||||
* minimum space, in MB, that must be available on the disk for the
|
||||
* dir to be marked as good
|
||||
*
|
||||
*/
|
||||
public DirectoryCollection(String[] dirs, long utilizationSpaceCutOff) {
|
||||
this(dirs, 100.0F, utilizationSpaceCutOff);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create collection for the directories specified. Users must specify the
|
||||
* maximum percentage of disk utilization allowed and the minimum amount of
|
||||
* free space that must be available for the dir to be used. If either check
|
||||
* fails the dir is removed from the good dirs list.
|
||||
*
|
||||
* @param dirs
|
||||
* directories to be monitored
|
||||
* @param utilizationPercentageCutOff
|
||||
* percentage of disk that can be used before the dir is taken out of
|
||||
* the good dirs list
|
||||
* @param utilizationSpaceCutOff
|
||||
* minimum space, in MB, that must be available on the disk for the
|
||||
* dir to be marked as good
|
||||
*
|
||||
*/
|
||||
public DirectoryCollection(String[] dirs,
|
||||
float utilizationPercentageCutOff,
|
||||
long utilizationSpaceCutOff) {
|
||||
localDirs = new CopyOnWriteArrayList<String>(dirs);
|
||||
failedDirs = new CopyOnWriteArrayList<String>();
|
||||
diskUtilizationPercentageCutoff = utilizationPercentageCutOff;
|
||||
diskUtilizationSpaceCutoff = utilizationSpaceCutOff;
|
||||
diskUtilizationPercentageCutoff =
|
||||
utilizationPercentageCutOff < 0.0F ? 0.0F
|
||||
: (utilizationPercentageCutOff > 100.0F ? 100.0F
|
||||
: utilizationPercentageCutOff);
|
||||
diskUtilizationSpaceCutoff =
|
||||
utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -103,19 +174,55 @@ synchronized boolean createNonExistentDirs(FileContext localFs,
|
||||
*/
|
||||
synchronized boolean checkDirs() {
|
||||
int oldNumFailures = numFailures;
|
||||
HashSet<String> checkFailedDirs = new HashSet<String>();
|
||||
for (final String dir : localDirs) {
|
||||
try {
|
||||
DiskChecker.checkDir(new File(dir));
|
||||
File testDir = new File(dir);
|
||||
DiskChecker.checkDir(testDir);
|
||||
if (isDiskUsageUnderPercentageLimit(testDir)) {
|
||||
LOG.warn("Directory " + dir
|
||||
+ " error, used space above threshold of "
|
||||
+ diskUtilizationPercentageCutoff
|
||||
+ "%, removing from the list of valid directories.");
|
||||
checkFailedDirs.add(dir);
|
||||
} else if (isDiskFreeSpaceWithinLimit(testDir)) {
|
||||
LOG.warn("Directory " + dir + " error, free space below limit of "
|
||||
+ diskUtilizationSpaceCutoff
|
||||
+ "MB, removing from the list of valid directories.");
|
||||
checkFailedDirs.add(dir);
|
||||
}
|
||||
} catch (DiskErrorException de) {
|
||||
LOG.warn("Directory " + dir + " error " +
|
||||
de.getMessage() + ", removing from the list of valid directories.");
|
||||
localDirs.remove(dir);
|
||||
failedDirs.add(dir);
|
||||
numFailures++;
|
||||
LOG.warn("Directory " + dir + " error " + de.getMessage()
|
||||
+ ", removing from the list of valid directories.");
|
||||
checkFailedDirs.add(dir);
|
||||
}
|
||||
}
|
||||
for (String dir : checkFailedDirs) {
|
||||
localDirs.remove(dir);
|
||||
failedDirs.add(dir);
|
||||
numFailures++;
|
||||
}
|
||||
return numFailures > oldNumFailures;
|
||||
}
|
||||
|
||||
private boolean isDiskUsageUnderPercentageLimit(File dir) {
|
||||
float freePercentage =
|
||||
100 * (dir.getUsableSpace() / (float) dir.getTotalSpace());
|
||||
float usedPercentage = 100.0F - freePercentage;
|
||||
if (usedPercentage > diskUtilizationPercentageCutoff
|
||||
|| usedPercentage >= 100.0F) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isDiskFreeSpaceWithinLimit(File dir) {
|
||||
long freeSpace = dir.getUsableSpace() / (1024 * 1024);
|
||||
if (freeSpace < this.diskUtilizationSpaceCutoff) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void createDir(FileContext localFs, Path dir, FsPermission perm)
|
||||
throws IOException {
|
||||
@ -132,4 +239,26 @@ private void createDir(FileContext localFs, Path dir, FsPermission perm)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public float getDiskUtilizationPercentageCutoff() {
|
||||
return diskUtilizationPercentageCutoff;
|
||||
}
|
||||
|
||||
public void setDiskUtilizationPercentageCutoff(
|
||||
float diskUtilizationPercentageCutoff) {
|
||||
this.diskUtilizationPercentageCutoff =
|
||||
diskUtilizationPercentageCutoff < 0.0F ? 0.0F
|
||||
: (diskUtilizationPercentageCutoff > 100.0F ? 100.0F
|
||||
: diskUtilizationPercentageCutoff);
|
||||
}
|
||||
|
||||
public long getDiskUtilizationSpaceCutoff() {
|
||||
return diskUtilizationSpaceCutoff;
|
||||
}
|
||||
|
||||
public void setDiskUtilizationSpaceCutoff(long diskUtilizationSpaceCutoff) {
|
||||
diskUtilizationSpaceCutoff =
|
||||
diskUtilizationSpaceCutoff < 0 ? 0 : diskUtilizationSpaceCutoff;
|
||||
this.diskUtilizationSpaceCutoff = diskUtilizationSpaceCutoff;
|
||||
}
|
||||
}
|
||||
|
@ -89,10 +89,22 @@ public class LocalDirsHandlerService extends AbstractService {
|
||||
private final class MonitoringTimerTask extends TimerTask {
|
||||
|
||||
public MonitoringTimerTask(Configuration conf) throws YarnRuntimeException {
|
||||
localDirs = new DirectoryCollection(
|
||||
validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS)));
|
||||
logDirs = new DirectoryCollection(
|
||||
validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)));
|
||||
float maxUsableSpacePercentagePerDisk =
|
||||
conf.getFloat(
|
||||
YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
|
||||
YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE);
|
||||
long minFreeSpacePerDiskMB =
|
||||
conf.getLong(YarnConfiguration.NM_MIN_PER_DISK_FREE_SPACE_MB,
|
||||
YarnConfiguration.DEFAULT_NM_MIN_PER_DISK_FREE_SPACE_MB);
|
||||
localDirs =
|
||||
new DirectoryCollection(
|
||||
validatePaths(conf
|
||||
.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS)),
|
||||
maxUsableSpacePercentagePerDisk, minFreeSpacePerDiskMB);
|
||||
logDirs =
|
||||
new DirectoryCollection(
|
||||
validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)),
|
||||
maxUsableSpacePercentagePerDisk, minFreeSpacePerDiskMB);
|
||||
localDirsAllocator = new LocalDirAllocator(
|
||||
YarnConfiguration.NM_LOCAL_DIRS);
|
||||
logDirsAllocator = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
|
||||
|
@ -30,6 +30,7 @@
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
@ -55,8 +56,11 @@ public static void teardown() {
|
||||
@Test
|
||||
public void testConcurrentAccess() throws IOException {
|
||||
// Initialize DirectoryCollection with a file instead of a directory
|
||||
Configuration conf = new Configuration();
|
||||
String[] dirs = {testFile.getPath()};
|
||||
DirectoryCollection dc = new DirectoryCollection(dirs);
|
||||
DirectoryCollection dc = new DirectoryCollection(dirs,
|
||||
conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
|
||||
YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
|
||||
|
||||
// Create an iterator before checkDirs is called to reliable test case
|
||||
List<String> list = dc.getGoodDirs();
|
||||
@ -88,7 +92,9 @@ public void testCreateDirectories() throws IOException {
|
||||
localFs.setPermission(pathC, permDirC);
|
||||
|
||||
String[] dirs = { dirA, dirB, dirC };
|
||||
DirectoryCollection dc = new DirectoryCollection(dirs);
|
||||
DirectoryCollection dc = new DirectoryCollection(dirs,
|
||||
conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
|
||||
YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
|
||||
FsPermission defaultPerm = FsPermission.getDefault()
|
||||
.applyUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
|
||||
boolean createResult = dc.createNonExistentDirs(localFs, defaultPerm);
|
||||
@ -104,4 +110,85 @@ public void testCreateDirectories() throws IOException {
|
||||
Assert.assertEquals("existing local directory permissions modified",
|
||||
permDirC, status.getPermission());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDiskSpaceUtilizationLimit() throws IOException {
|
||||
|
||||
String dirA = new File(testDir, "dirA").getPath();
|
||||
String[] dirs = { dirA };
|
||||
DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F);
|
||||
dc.checkDirs();
|
||||
Assert.assertEquals(0, dc.getGoodDirs().size());
|
||||
Assert.assertEquals(1, dc.getFailedDirs().size());
|
||||
|
||||
dc = new DirectoryCollection(dirs, 100.0F);
|
||||
dc.checkDirs();
|
||||
Assert.assertEquals(1, dc.getGoodDirs().size());
|
||||
Assert.assertEquals(0, dc.getFailedDirs().size());
|
||||
|
||||
dc = new DirectoryCollection(dirs, testDir.getTotalSpace() / (1024 * 1024));
|
||||
dc.checkDirs();
|
||||
Assert.assertEquals(0, dc.getGoodDirs().size());
|
||||
Assert.assertEquals(1, dc.getFailedDirs().size());
|
||||
|
||||
dc = new DirectoryCollection(dirs, 100.0F, 0);
|
||||
dc.checkDirs();
|
||||
Assert.assertEquals(1, dc.getGoodDirs().size());
|
||||
Assert.assertEquals(0, dc.getFailedDirs().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDiskLimitsCutoffSetters() {
|
||||
|
||||
String[] dirs = { "dir" };
|
||||
DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F, 100);
|
||||
float testValue = 57.5F;
|
||||
float delta = 0.1F;
|
||||
dc.setDiskUtilizationPercentageCutoff(testValue);
|
||||
Assert.assertEquals(testValue, dc.getDiskUtilizationPercentageCutoff(),
|
||||
delta);
|
||||
testValue = -57.5F;
|
||||
dc.setDiskUtilizationPercentageCutoff(testValue);
|
||||
Assert.assertEquals(0.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
|
||||
testValue = 157.5F;
|
||||
dc.setDiskUtilizationPercentageCutoff(testValue);
|
||||
Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
|
||||
|
||||
long spaceValue = 57;
|
||||
dc.setDiskUtilizationSpaceCutoff(spaceValue);
|
||||
Assert.assertEquals(spaceValue, dc.getDiskUtilizationSpaceCutoff());
|
||||
spaceValue = -57;
|
||||
dc.setDiskUtilizationSpaceCutoff(spaceValue);
|
||||
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConstructors() {
|
||||
|
||||
String[] dirs = { "dir" };
|
||||
float delta = 0.1F;
|
||||
DirectoryCollection dc = new DirectoryCollection(dirs);
|
||||
Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
|
||||
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
|
||||
|
||||
dc = new DirectoryCollection(dirs, 57.5F);
|
||||
Assert.assertEquals(57.5F, dc.getDiskUtilizationPercentageCutoff(), delta);
|
||||
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
|
||||
|
||||
dc = new DirectoryCollection(dirs, 57);
|
||||
Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
|
||||
Assert.assertEquals(57, dc.getDiskUtilizationSpaceCutoff());
|
||||
|
||||
dc = new DirectoryCollection(dirs, 57.5F, 67);
|
||||
Assert.assertEquals(57.5F, dc.getDiskUtilizationPercentageCutoff(), delta);
|
||||
Assert.assertEquals(67, dc.getDiskUtilizationSpaceCutoff());
|
||||
|
||||
dc = new DirectoryCollection(dirs, -57.5F, -67);
|
||||
Assert.assertEquals(0.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
|
||||
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
|
||||
|
||||
dc = new DirectoryCollection(dirs, 157.5F, -67);
|
||||
Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta);
|
||||
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user