HDFS-15830. Support to make dfs.image.parallel.load reconfigurable (#2694)
This commit is contained in:
parent
8c8ef2f444
commit
a9d3500894
@ -172,6 +172,7 @@ protected FSImage(Configuration conf,
|
||||
|
||||
this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
|
||||
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
|
||||
FSImageFormatProtobuf.initParallelLoad(conf);
|
||||
}
|
||||
|
||||
void format(FSNamesystem fsn, String clusterId, boolean force)
|
||||
|
@ -242,6 +242,7 @@ public void load(File file, boolean requireSameLayoutVersion)
|
||||
* the layout version.
|
||||
*/
|
||||
public static LoaderDelegator newLoader(Configuration conf, FSNamesystem fsn) {
|
||||
|
||||
return new LoaderDelegator(conf, fsn);
|
||||
}
|
||||
|
||||
|
@ -88,6 +88,8 @@ public final class FSImageFormatProtobuf {
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(FSImageFormatProtobuf.class);
|
||||
|
||||
private static volatile boolean enableParallelLoad = false;
|
||||
|
||||
public static final class LoaderContext {
|
||||
private SerialNumberManager.StringTable stringTable;
|
||||
private final ArrayList<INodeReference> refList = Lists.newArrayList();
|
||||
@ -576,9 +578,7 @@ private void loadErasureCodingSection(InputStream in)
|
||||
}
|
||||
|
||||
private static boolean enableParallelSaveAndLoad(Configuration conf) {
|
||||
boolean loadInParallel =
|
||||
conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY,
|
||||
DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT);
|
||||
boolean loadInParallel = enableParallelLoad;
|
||||
boolean compressionEnabled = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
|
||||
DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);
|
||||
@ -594,6 +594,20 @@ private static boolean enableParallelSaveAndLoad(Configuration conf) {
|
||||
return loadInParallel;
|
||||
}
|
||||
|
||||
public static void initParallelLoad(Configuration conf) {
|
||||
enableParallelLoad =
|
||||
conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY,
|
||||
DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT);
|
||||
}
|
||||
|
||||
public static void refreshParallelSaveAndLoad(boolean enable) {
|
||||
enableParallelLoad = enable;
|
||||
}
|
||||
|
||||
public static boolean getEnableParallelLoad() {
|
||||
return enableParallelLoad;
|
||||
}
|
||||
|
||||
public static final class Saver {
|
||||
public static final int CHECK_CANCEL_INTERVAL = 4096;
|
||||
private boolean writeSubSections = false;
|
||||
@ -634,10 +648,6 @@ public int getInodesPerSubSection() {
|
||||
return inodesPerSubSection;
|
||||
}
|
||||
|
||||
public boolean shouldWriteSubSections() {
|
||||
return writeSubSections;
|
||||
}
|
||||
|
||||
/**
|
||||
* Commit the length and offset of a fsimage section to the summary index,
|
||||
* including the sub section, which will be committed before the section is
|
||||
|
@ -124,6 +124,8 @@
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT;
|
||||
@ -326,7 +328,8 @@ public enum OperationCategory {
|
||||
DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
|
||||
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
|
||||
DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
||||
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY));
|
||||
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
|
||||
DFS_IMAGE_PARALLEL_LOAD_KEY));
|
||||
|
||||
private static final String USAGE = "Usage: hdfs namenode ["
|
||||
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
||||
@ -2188,6 +2191,8 @@ protected String reconfigurePropertyImpl(String property, String newVal)
|
||||
.equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) {
|
||||
reconfBlockPlacementPolicy();
|
||||
return newVal;
|
||||
} else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) {
|
||||
return reconfigureParallelLoad(newVal);
|
||||
} else {
|
||||
throw new ReconfigurationException(property, newVal, getConf().get(
|
||||
property));
|
||||
@ -2363,6 +2368,17 @@ String reconfigureSPSModeEvent(String newVal, String property)
|
||||
return newVal;
|
||||
}
|
||||
|
||||
String reconfigureParallelLoad(String newVal) {
|
||||
boolean enableParallelLoad;
|
||||
if (newVal == null) {
|
||||
enableParallelLoad = DFS_IMAGE_PARALLEL_LOAD_DEFAULT;
|
||||
} else {
|
||||
enableParallelLoad = Boolean.parseBoolean(newVal);
|
||||
}
|
||||
FSImageFormatProtobuf.refreshParallelSaveAndLoad(enableParallelLoad);
|
||||
return Boolean.toString(enableParallelLoad);
|
||||
}
|
||||
|
||||
@Override // ReconfigurableBase
|
||||
protected Configuration getNewConf() {
|
||||
return new HdfsConfiguration();
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.junit.Before;
|
||||
import org.junit.After;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
@ -378,6 +379,21 @@ public void testBlockInvalidateLimitAfterReconfigured()
|
||||
datanodeManager.getBlockInvalidateLimit());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnableParallelLoadAfterReconfigured()
|
||||
throws ReconfigurationException {
|
||||
final NameNode nameNode = cluster.getNameNode();
|
||||
|
||||
// By default, enableParallelLoad is false
|
||||
assertEquals(false, FSImageFormatProtobuf.getEnableParallelLoad());
|
||||
|
||||
nameNode.reconfigureProperty(DFS_IMAGE_PARALLEL_LOAD_KEY,
|
||||
Boolean.toString(true));
|
||||
|
||||
// After reconfigured, enableParallelLoad is true
|
||||
assertEquals(true, FSImageFormatProtobuf.getEnableParallelLoad());
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutDown() throws IOException {
|
||||
if (cluster != null) {
|
||||
|
@ -21,6 +21,7 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
|
||||
@ -421,11 +422,12 @@ public void testNameNodeGetReconfigurableProperties() throws IOException {
|
||||
final List<String> outs = Lists.newArrayList();
|
||||
final List<String> errs = Lists.newArrayList();
|
||||
getReconfigurableProperties("namenode", address, outs, errs);
|
||||
assertEquals(12, outs.size());
|
||||
assertEquals(13, outs.size());
|
||||
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
|
||||
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
|
||||
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
|
||||
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(4));
|
||||
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(4));
|
||||
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(5));
|
||||
assertEquals(errs.size(), 0);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user