HDFS-15033. Support to save replica cached files to other place and make expired time configurable. Contributed by Yang Yun.

This commit is contained in:
Ayush Saxena 2020-02-29 09:55:29 +05:30
parent 97b797c314
commit 1a636da041
4 changed files with 71 additions and 4 deletions

View File

@ -156,6 +156,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_FIXED_VOLUME_SIZE_KEY = public static final String DFS_DATANODE_FIXED_VOLUME_SIZE_KEY =
"dfs.datanode.fixed.volume.size"; "dfs.datanode.fixed.volume.size";
public static final boolean DFS_DATANODE_FIXED_VOLUME_SIZE_DEFAULT = false; public static final boolean DFS_DATANODE_FIXED_VOLUME_SIZE_DEFAULT = false;
public static final String DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY =
"dfs.datanode.replica.cache.root.dir";
public static final String DFS_DATANODE_REPLICA_CACHE_EXPIRY_TIME_KEY =
"dfs.datanode.replica.cache.expiry.time";
public static final long DFS_DATANODE_REPLICA_CACHE_EXPIRY_TIME_DEFAULT =
300000;
// This setting is for testing/internal use only. // This setting is for testing/internal use only.
public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion"; public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion";

View File

@ -44,6 +44,7 @@
import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveAction;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed; import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -103,7 +104,8 @@ class BlockPoolSlice {
private static final int SHUTDOWN_HOOK_PRIORITY = 30; private static final int SHUTDOWN_HOOK_PRIORITY = 30;
private final boolean deleteDuplicateReplicas; private final boolean deleteDuplicateReplicas;
private static final String REPLICA_CACHE_FILE = "replicas"; private static final String REPLICA_CACHE_FILE = "replicas";
private final long replicaCacheExpiry = 5*60*1000; private final long replicaCacheExpiry;
private final File replicaCacheDir;
private AtomicLong numOfBlocks = new AtomicLong(); private AtomicLong numOfBlocks = new AtomicLong();
private final long cachedDfsUsedCheckTime; private final long cachedDfsUsedCheckTime;
private final Timer timer; private final Timer timer;
@ -180,6 +182,24 @@ public int compare(File f1, File f2) {
fileIoProvider.mkdirs(volume, rbwDir); fileIoProvider.mkdirs(volume, rbwDir);
fileIoProvider.mkdirs(volume, tmpDir); fileIoProvider.mkdirs(volume, tmpDir);
String cacheDirRoot = conf.get(
DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY);
if (cacheDirRoot != null && !cacheDirRoot.isEmpty()) {
this.replicaCacheDir = new File(cacheDirRoot,
currentDir.getCanonicalPath());
if (!this.replicaCacheDir.exists()) {
if (!this.replicaCacheDir.mkdirs()) {
throw new IOException("Failed to mkdirs " + this.replicaCacheDir);
}
}
} else {
this.replicaCacheDir = currentDir;
}
this.replicaCacheExpiry = conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_EXPIRY_TIME_KEY,
DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_EXPIRY_TIME_DEFAULT,
TimeUnit.MILLISECONDS);
// Use cached value initially if available. Or the following call will // Use cached value initially if available. Or the following call will
// block until the initial du command completes. // block until the initial du command completes.
this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid) this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
@ -876,7 +896,7 @@ void shutdown(BlockListAsLongs blocksListToPersist) {
private boolean readReplicasFromCache(ReplicaMap volumeMap, private boolean readReplicasFromCache(ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap) { final RamDiskReplicaTracker lazyWriteReplicaMap) {
ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE); File replicaFile = new File(replicaCacheDir, REPLICA_CACHE_FILE);
// Check whether the file exists or not. // Check whether the file exists or not.
if (!replicaFile.exists()) { if (!replicaFile.exists()) {
LOG.info("Replica Cache file: "+ replicaFile.getPath() + LOG.info("Replica Cache file: "+ replicaFile.getPath() +
@ -954,8 +974,8 @@ private void saveReplicas(BlockListAsLongs blocksListToPersist) {
blocksListToPersist.getNumberOfBlocks()== 0) { blocksListToPersist.getNumberOfBlocks()== 0) {
return; return;
} }
final File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp"); final File tmpFile = new File(replicaCacheDir, REPLICA_CACHE_FILE + ".tmp");
final File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE); final File replicaCacheFile = new File(replicaCacheDir, REPLICA_CACHE_FILE);
if (!fileIoProvider.deleteWithExistsCheck(volume, tmpFile) || if (!fileIoProvider.deleteWithExistsCheck(volume, tmpFile) ||
!fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile)) { !fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile)) {
return; return;

View File

@ -4516,6 +4516,23 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.replica.cache.root.dir</name>
<value></value>
<description>
Use this key to change root dir of replica cache.
The default root dir is currentDir.
</description>
</property>
<property>
<name>dfs.datanode.replica.cache.expiry.time</name>
<value>5m</value>
<description>
Living time of replica cached files in milliseconds.
</description>
</property>
<property> <property>
<name>dfs.ha.fencing.methods</name> <name>dfs.ha.fencing.methods</name>
<value></value> <value></value>

View File

@ -113,6 +113,7 @@ public class TestFsDatasetImpl {
Logger LOG = LoggerFactory.getLogger(TestFsDatasetImpl.class); Logger LOG = LoggerFactory.getLogger(TestFsDatasetImpl.class);
private static final String BASE_DIR = private static final String BASE_DIR =
new FileSystemTestHelper().getTestRootDir(); new FileSystemTestHelper().getTestRootDir();
private String replicaCacheRootDir = BASE_DIR + Path.SEPARATOR + "cache";
private static final int NUM_INIT_VOLUMES = 2; private static final int NUM_INIT_VOLUMES = 2;
private static final String CLUSTER_ID = "cluser-id"; private static final String CLUSTER_ID = "cluser-id";
private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"}; private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
@ -172,6 +173,8 @@ public void setUp() throws IOException {
storage = mock(DataStorage.class); storage = mock(DataStorage.class);
this.conf = new Configuration(); this.conf = new Configuration();
this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
this.conf.set(DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY,
replicaCacheRootDir);
when(datanode.getConf()).thenReturn(conf); when(datanode.getConf()).thenReturn(conf);
final DNConf dnConf = new DNConf(datanode); final DNConf dnConf = new DNConf(datanode);
@ -963,4 +966,25 @@ public void testDataDirWithPercent() throws IOException {
new StorageDirectory(StorageLocation.parse(dataDir.getPath()))) new StorageDirectory(StorageLocation.parse(dataDir.getPath())))
.build(); .build();
} }
@Test
public void testReplicaCacheFileToOtherPlace() throws IOException {
final String bpid = "bpid-0";
for (int i = 0; i < 5; i++) {
ExtendedBlock eb = new ExtendedBlock(bpid, i);
dataset.createRbw(StorageType.DEFAULT, null, eb, false);
}
List<File> cacheFiles = new ArrayList<>();
for (FsVolumeSpi vol: dataset.getFsVolumeReferences()) {
BlockPoolSlice bpSlice = ((FsVolumeImpl)vol).getBlockPoolSlice(bpid);
File cacheFile = new File(replicaCacheRootDir + Path.SEPARATOR +
bpSlice.getDirectory().getCanonicalPath() + Path.SEPARATOR +
DataStorage.STORAGE_DIR_CURRENT + Path.SEPARATOR + "replicas");
cacheFiles.add(cacheFile);
}
dataset.shutdownBlockPool(bpid);
for (File f : cacheFiles) {
assertTrue(f.exists());
}
}
} }