HDFS-9624. DataNode start slowly due to the initial DU command operations. (Lin Yiqun via wang)
This commit is contained in:
parent
a9c69ebeb7
commit
c07f7fa8ff
@ -933,6 +933,9 @@ Release 2.9.0 - UNRELEASED
|
||||
HDFS-9350. Avoid creating temprorary strings in Block.toString() and
|
||||
getBlockName() (Staffan Friberg via cmccabe)
|
||||
|
||||
HDFS-9624. DataNode start slowly due to the initial DU command operations.
|
||||
(Lin Yiqun via wang)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -125,6 +125,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion";
|
||||
public static final boolean DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT = true;
|
||||
|
||||
public static final String DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS =
|
||||
"dfs.datanode.cached-dfsused.check.interval.ms";
|
||||
public static final long DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS =
|
||||
600000;
|
||||
|
||||
public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
|
||||
"dfs.namenode.path.based.cache.block.map.allocation.percent";
|
||||
public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;
|
||||
|
@ -55,7 +55,7 @@
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.io.Files;
|
||||
@ -79,12 +79,15 @@ class BlockPoolSlice {
|
||||
private final File rbwDir; // directory store RBW replica
|
||||
private final File tmpDir; // directory store Temporary replica
|
||||
private final int ioFileBufferSize;
|
||||
private static final String DU_CACHE_FILE = "dfsUsed";
|
||||
@VisibleForTesting
|
||||
public static final String DU_CACHE_FILE = "dfsUsed";
|
||||
private volatile boolean dfsUsedSaved = false;
|
||||
private static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
||||
private final boolean deleteDuplicateReplicas;
|
||||
private static final String REPLICA_CACHE_FILE = "replicas";
|
||||
private final long replicaCacheExpiry = 5*60*1000;
|
||||
private final long cachedDfsUsedCheckTime;
|
||||
private final Timer timer;
|
||||
|
||||
// TODO:FEDERATION scalability issue - a thread per DU is needed
|
||||
private final DU dfsUsage;
|
||||
@ -95,10 +98,11 @@ class BlockPoolSlice {
|
||||
* @param volume {@link FsVolumeImpl} to which this BlockPool belongs to
|
||||
* @param bpDir directory corresponding to the BlockPool
|
||||
* @param conf configuration
|
||||
* @param timer include methods for getting time
|
||||
* @throws IOException
|
||||
*/
|
||||
BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
|
||||
Configuration conf) throws IOException {
|
||||
Configuration conf, Timer timer) throws IOException {
|
||||
this.bpid = bpid;
|
||||
this.volume = volume;
|
||||
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
|
||||
@ -117,6 +121,12 @@ class BlockPoolSlice {
|
||||
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
|
||||
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);
|
||||
|
||||
this.cachedDfsUsedCheckTime =
|
||||
conf.getLong(
|
||||
DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
|
||||
DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS);
|
||||
this.timer = timer;
|
||||
|
||||
// Files that were being written when the datanode was last shutdown
|
||||
// are now moved back to the data directory. It is possible that
|
||||
// in the future, we might want to do some sort of datanode-local
|
||||
@ -187,11 +197,13 @@ void incDfsUsed(long value) {
|
||||
dfsUsage.incDfsUsed(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read in the cached DU value and return it if it is less than 600 seconds
|
||||
* old (DU update interval). Slight imprecision of dfsUsed is not critical
|
||||
* and skipping DU can significantly shorten the startup time.
|
||||
* If the cached value is not available or too old, -1 is returned.
|
||||
/**
|
||||
* Read in the cached DU value and return it if it is less than
|
||||
* cachedDfsUsedCheckTime which is set by
|
||||
* dfs.datanode.cached-dfsused.check.interval.ms parameter. Slight imprecision
|
||||
* of dfsUsed is not critical and skipping DU can significantly shorten the
|
||||
* startup time. If the cached value is not available or too old, -1 is
|
||||
* returned.
|
||||
*/
|
||||
long loadDfsUsed() {
|
||||
long cachedDfsUsed;
|
||||
@ -219,7 +231,7 @@ long loadDfsUsed() {
|
||||
}
|
||||
|
||||
// Return the cached value if mtime is okay.
|
||||
if (mtime > 0 && (Time.now() - mtime < 600000L)) {
|
||||
if (mtime > 0 && (timer.now() - mtime < cachedDfsUsedCheckTime)) {
|
||||
FsDatasetImpl.LOG.info("Cached dfsUsed found for " + currentDir + ": " +
|
||||
cachedDfsUsed);
|
||||
return cachedDfsUsed;
|
||||
@ -245,7 +257,7 @@ void saveDfsUsed() {
|
||||
try (Writer out = new OutputStreamWriter(
|
||||
new FileOutputStream(outFile), "UTF-8")) {
|
||||
// mtime is written last, so that truncated writes won't be valid.
|
||||
out.write(Long.toString(used) + " " + Long.toString(Time.now()));
|
||||
out.write(Long.toString(used) + " " + Long.toString(timer.now()));
|
||||
out.flush();
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
@ -434,7 +446,7 @@ private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap,
|
||||
try {
|
||||
sc = new Scanner(restartMeta, "UTF-8");
|
||||
// The restart meta file exists
|
||||
if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
|
||||
if (sc.hasNextLong() && (sc.nextLong() > timer.now())) {
|
||||
// It didn't expire. Load the replica as a RBW.
|
||||
// We don't know the expected block length, so just use 0
|
||||
// and don't reserve any more space for writes.
|
||||
|
@ -113,6 +113,7 @@
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -127,6 +128,7 @@
|
||||
class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
static final Log LOG = LogFactory.getLog(FsDatasetImpl.class);
|
||||
private final static boolean isNativeIOAvailable;
|
||||
private Timer timer;
|
||||
static {
|
||||
isNativeIOAvailable = NativeIO.isAvailable();
|
||||
if (Path.WINDOWS && !isNativeIOAvailable) {
|
||||
@ -433,7 +435,7 @@ public void addVolume(final StorageLocation location,
|
||||
for (final NamespaceInfo nsInfo : nsInfos) {
|
||||
String bpid = nsInfo.getBlockPoolID();
|
||||
try {
|
||||
fsVolume.addBlockPool(bpid, this.conf);
|
||||
fsVolume.addBlockPool(bpid, this.conf, this.timer);
|
||||
fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Caught exception when adding " + fsVolume +
|
||||
@ -3080,5 +3082,10 @@ boolean reserveLockedMemory(long bytesNeeded) {
|
||||
evictLazyPersistBlocks(bytesNeeded);
|
||||
return cacheManager.reserve(bytesNeeded) > 0;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setTimer(Timer newTimer) {
|
||||
this.timer = newTimer;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -59,6 +59,7 @@
|
||||
import org.apache.hadoop.util.CloseableReferenceCount;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
import org.codehaus.jackson.annotate.JsonProperty;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
@ -858,8 +859,18 @@ void shutdown() {
|
||||
}
|
||||
|
||||
void addBlockPool(String bpid, Configuration conf) throws IOException {
|
||||
addBlockPool(bpid, conf, null);
|
||||
}
|
||||
|
||||
void addBlockPool(String bpid, Configuration conf, Timer timer)
|
||||
throws IOException {
|
||||
File bpdir = new File(currentDir, bpid);
|
||||
BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
|
||||
BlockPoolSlice bp;
|
||||
if (timer == null) {
|
||||
bp = new BlockPoolSlice(bpid, this, bpdir, conf, new Timer());
|
||||
} else {
|
||||
bp = new BlockPoolSlice(bpid, this, bpdir, conf, timer);
|
||||
}
|
||||
bpSlices.put(bpid, bp);
|
||||
}
|
||||
|
||||
|
@ -2738,4 +2738,17 @@
|
||||
reduces initial request failures after datanode restart.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.cached-dfsused.check.interval.ms</name>
|
||||
<value>600000</value>
|
||||
<description>
|
||||
The interval check time of loading DU_CACHE_FILE in each volume.
|
||||
When the cluster doing the rolling upgrade operations, it will
|
||||
usually lead dfsUsed cache file of each volume expired and redo the
|
||||
du operations in datanode and that makes datanode start slowly. Adjust
|
||||
this property can make cache file be available for the time as you want.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
@ -48,6 +48,7 @@
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.FakeTimer;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -57,13 +58,18 @@
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.Writer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
@ -444,4 +450,95 @@ public void testDuplicateReplicaResolution() throws IOException {
|
||||
assertSame(replica,
|
||||
BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadingDfsUsedForVolumes() throws IOException,
|
||||
InterruptedException {
|
||||
long waitIntervalTime = 5000;
|
||||
// Initialize the cachedDfsUsedIntervalTime larger than waitIntervalTime
|
||||
// to avoid cache-dfsused time expired
|
||||
long cachedDfsUsedIntervalTime = waitIntervalTime + 1000;
|
||||
conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
|
||||
cachedDfsUsedIntervalTime);
|
||||
|
||||
long cacheDfsUsed = 1024;
|
||||
long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime);
|
||||
|
||||
assertEquals(cacheDfsUsed, dfsUsed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadingDfsUsedForVolumesExpired() throws IOException,
|
||||
InterruptedException {
|
||||
long waitIntervalTime = 5000;
|
||||
// Initialize the cachedDfsUsedIntervalTime smaller than waitIntervalTime
|
||||
// to make cache-dfsused time expired
|
||||
long cachedDfsUsedIntervalTime = waitIntervalTime - 1000;
|
||||
conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
|
||||
cachedDfsUsedIntervalTime);
|
||||
|
||||
long cacheDfsUsed = 1024;
|
||||
long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime);
|
||||
|
||||
// Because the cache-dfsused expired and the dfsUsed will be recalculated
|
||||
assertTrue(cacheDfsUsed != dfsUsed);
|
||||
}
|
||||
|
||||
private long getDfsUsedValueOfNewVolume(long cacheDfsUsed,
|
||||
long waitIntervalTime) throws IOException, InterruptedException {
|
||||
List<NamespaceInfo> nsInfos = Lists.newArrayList();
|
||||
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, BLOCK_POOL_IDS[0], 1));
|
||||
|
||||
String CURRENT_DIR = "current";
|
||||
String DU_CACHE_FILE = BlockPoolSlice.DU_CACHE_FILE;
|
||||
String path = BASE_DIR + "/newData0";
|
||||
String pathUri = new Path(path).toUri().toString();
|
||||
StorageLocation loc = StorageLocation.parse(pathUri);
|
||||
Storage.StorageDirectory sd = createStorageDirectory(new File(path));
|
||||
DataStorage.VolumeBuilder builder =
|
||||
new DataStorage.VolumeBuilder(storage, sd);
|
||||
when(
|
||||
storage.prepareVolume(eq(datanode), eq(loc.getFile()),
|
||||
anyListOf(NamespaceInfo.class))).thenReturn(builder);
|
||||
|
||||
String cacheFilePath =
|
||||
String.format("%s/%s/%s/%s/%s", path, CURRENT_DIR, BLOCK_POOL_IDS[0],
|
||||
CURRENT_DIR, DU_CACHE_FILE);
|
||||
File outFile = new File(cacheFilePath);
|
||||
|
||||
if (!outFile.getParentFile().exists()) {
|
||||
outFile.getParentFile().mkdirs();
|
||||
}
|
||||
|
||||
if (outFile.exists()) {
|
||||
outFile.delete();
|
||||
}
|
||||
|
||||
FakeTimer timer = new FakeTimer();
|
||||
try {
|
||||
try (Writer out =
|
||||
new OutputStreamWriter(new FileOutputStream(outFile),
|
||||
StandardCharsets.UTF_8)) {
|
||||
// Write the dfsUsed value and the time to cache file
|
||||
out.write(Long.toString(cacheDfsUsed) + " "
|
||||
+ Long.toString(timer.now()));
|
||||
out.flush();
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
}
|
||||
|
||||
dataset.setTimer(timer);
|
||||
timer.advance(waitIntervalTime);
|
||||
dataset.addVolume(loc, nsInfos);
|
||||
|
||||
// Get the last volume which was just added before
|
||||
FsVolumeImpl newVolume;
|
||||
try (FsDatasetSpi.FsVolumeReferences volumes =
|
||||
dataset.getFsVolumeReferences()) {
|
||||
newVolume = (FsVolumeImpl) volumes.get(volumes.size() - 1);
|
||||
}
|
||||
long dfsUsed = newVolume.getDfsUsed();
|
||||
|
||||
return dfsUsed;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user