HDFS-16457. Make fs.getspaceused.classname reconfigurable (apache#4069) (#4156)
This commit is contained in:
parent
603367c54f
commit
26705bbc60
@ -22,6 +22,7 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
||||||
@ -84,6 +85,9 @@
|
|||||||
import static org.apache.hadoop.util.Time.now;
|
import static org.apache.hadoop.util.Time.now;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.fs.DU;
|
||||||
|
import org.apache.hadoop.fs.GetSpaceUsed;
|
||||||
|
import org.apache.hadoop.fs.WindowsGetSpaceUsed;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
|
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
|
||||||
|
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
@ -243,6 +247,7 @@
|
|||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
import org.apache.hadoop.util.JvmPauseMonitor;
|
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||||
import org.apache.hadoop.util.ServicePlugin;
|
import org.apache.hadoop.util.ServicePlugin;
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.Timer;
|
import org.apache.hadoop.util.Timer;
|
||||||
@ -352,7 +357,8 @@ public class DataNode extends ReconfigurableBase
|
|||||||
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
|
||||||
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY,
|
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY,
|
||||||
FS_DU_INTERVAL_KEY,
|
FS_DU_INTERVAL_KEY,
|
||||||
FS_GETSPACEUSED_JITTER_KEY));
|
FS_GETSPACEUSED_JITTER_KEY,
|
||||||
|
FS_GETSPACEUSED_CLASSNAME));
|
||||||
|
|
||||||
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
||||||
|
|
||||||
@ -678,6 +684,7 @@ public String reconfigurePropertyImpl(String property, String newVal)
|
|||||||
return reconfSlowDiskParameters(property, newVal);
|
return reconfSlowDiskParameters(property, newVal);
|
||||||
case FS_DU_INTERVAL_KEY:
|
case FS_DU_INTERVAL_KEY:
|
||||||
case FS_GETSPACEUSED_JITTER_KEY:
|
case FS_GETSPACEUSED_JITTER_KEY:
|
||||||
|
case FS_GETSPACEUSED_CLASSNAME:
|
||||||
return reconfDfsUsageParameters(property, newVal);
|
return reconfDfsUsageParameters(property, newVal);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
@ -874,7 +881,7 @@ private String reconfDfsUsageParameters(String property, String newVal)
|
|||||||
for (FsVolumeImpl fsVolume : volumeList) {
|
for (FsVolumeImpl fsVolume : volumeList) {
|
||||||
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
|
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
|
||||||
for (BlockPoolSlice value : blockPoolSlices.values()) {
|
for (BlockPoolSlice value : blockPoolSlices.values()) {
|
||||||
value.updateDfsUsageConfig(interval, null);
|
value.updateDfsUsageConfig(interval, null, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (property.equals(FS_GETSPACEUSED_JITTER_KEY)) {
|
} else if (property.equals(FS_GETSPACEUSED_JITTER_KEY)) {
|
||||||
@ -886,13 +893,33 @@ private String reconfDfsUsageParameters(String property, String newVal)
|
|||||||
for (FsVolumeImpl fsVolume : volumeList) {
|
for (FsVolumeImpl fsVolume : volumeList) {
|
||||||
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
|
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
|
||||||
for (BlockPoolSlice value : blockPoolSlices.values()) {
|
for (BlockPoolSlice value : blockPoolSlices.values()) {
|
||||||
value.updateDfsUsageConfig(null, jitter);
|
value.updateDfsUsageConfig(null, jitter, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (property.equals(FS_GETSPACEUSED_CLASSNAME)) {
|
||||||
|
Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized.");
|
||||||
|
Class<? extends GetSpaceUsed> klass;
|
||||||
|
if (newVal == null) {
|
||||||
|
if (Shell.WINDOWS) {
|
||||||
|
klass = DU.class;
|
||||||
|
} else {
|
||||||
|
klass = WindowsGetSpaceUsed.class;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
klass = Class.forName(newVal).asSubclass(GetSpaceUsed.class);
|
||||||
|
}
|
||||||
|
result = klass.getName();
|
||||||
|
List<FsVolumeImpl> volumeList = data.getVolumeList();
|
||||||
|
for (FsVolumeImpl fsVolume : volumeList) {
|
||||||
|
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
|
||||||
|
for (BlockPoolSlice value : blockPoolSlices.values()) {
|
||||||
|
value.updateDfsUsageConfig(null, null, klass);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
|
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
|
||||||
return result;
|
return result;
|
||||||
} catch (IllegalArgumentException | IOException e) {
|
} catch (IllegalArgumentException | IOException | ClassNotFoundException e) {
|
||||||
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
|
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -80,6 +80,7 @@
|
|||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -236,7 +237,8 @@ public void run() {
|
|||||||
SHUTDOWN_HOOK_PRIORITY);
|
SHUTDOWN_HOOK_PRIORITY);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateDfsUsageConfig(Long interval, Long jitter) throws IOException {
|
public void updateDfsUsageConfig(Long interval, Long jitter, Class<? extends GetSpaceUsed> klass)
|
||||||
|
throws IOException {
|
||||||
// Close the old dfsUsage if it is CachingGetSpaceUsed.
|
// Close the old dfsUsage if it is CachingGetSpaceUsed.
|
||||||
if (dfsUsage instanceof CachingGetSpaceUsed) {
|
if (dfsUsage instanceof CachingGetSpaceUsed) {
|
||||||
((CachingGetSpaceUsed) dfsUsage).close();
|
((CachingGetSpaceUsed) dfsUsage).close();
|
||||||
@ -251,6 +253,9 @@ public void updateDfsUsageConfig(Long interval, Long jitter) throws IOException
|
|||||||
FS_GETSPACEUSED_JITTER_KEY + " should be larger than or equal to 0");
|
FS_GETSPACEUSED_JITTER_KEY + " should be larger than or equal to 0");
|
||||||
config.setLong(FS_GETSPACEUSED_JITTER_KEY, jitter);
|
config.setLong(FS_GETSPACEUSED_JITTER_KEY, jitter);
|
||||||
}
|
}
|
||||||
|
if (klass != null) {
|
||||||
|
config.setClass(FS_GETSPACEUSED_CLASSNAME, klass, CachingGetSpaceUsed.class);
|
||||||
|
}
|
||||||
// Start new dfsUsage.
|
// Start new dfsUsage.
|
||||||
this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
|
this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
|
||||||
.setVolume(volume)
|
.setVolume(volume)
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
||||||
@ -86,6 +87,7 @@ public class TestDataNodeReconfiguration {
|
|||||||
private final int NUM_NAME_NODE = 1;
|
private final int NUM_NAME_NODE = 1;
|
||||||
private final int NUM_DATA_NODE = 10;
|
private final int NUM_DATA_NODE = 10;
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
|
private static long counter = 0;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void Setup() throws IOException {
|
public void Setup() throws IOException {
|
||||||
@ -756,4 +758,33 @@ public void testDfsUsageParameters() throws ReconfigurationException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class DummyCachingGetSpaceUsed extends CachingGetSpaceUsed {
|
||||||
|
public DummyCachingGetSpaceUsed(Builder builder) throws IOException {
|
||||||
|
super(builder.setInterval(1000).setJitter(0L));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void refresh() {
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDfsUsageKlass() throws ReconfigurationException, InterruptedException {
|
||||||
|
|
||||||
|
long lastCounter = counter;
|
||||||
|
Thread.sleep(5000);
|
||||||
|
assertEquals(lastCounter, counter);
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_DATA_NODE; i++) {
|
||||||
|
DataNode dn = cluster.getDataNodes().get(i);
|
||||||
|
dn.reconfigurePropertyImpl(FS_GETSPACEUSED_CLASSNAME,
|
||||||
|
DummyCachingGetSpaceUsed.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
lastCounter = counter;
|
||||||
|
Thread.sleep(5000);
|
||||||
|
assertTrue(counter > lastCounter);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -331,7 +331,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException {
|
|||||||
final List<String> outs = Lists.newArrayList();
|
final List<String> outs = Lists.newArrayList();
|
||||||
final List<String> errs = Lists.newArrayList();
|
final List<String> errs = Lists.newArrayList();
|
||||||
getReconfigurableProperties("datanode", address, outs, errs);
|
getReconfigurableProperties("datanode", address, outs, errs);
|
||||||
assertEquals(18, outs.size());
|
assertEquals(19, outs.size());
|
||||||
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
|
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user