HDFS-17068. Datanode should record last directory scan time. (#5809). Contributed by farmmamba.
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org> Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
parent
e7d74f3d59
commit
c457c445b7
@ -406,6 +406,7 @@ public void run() {
|
||||
}
|
||||
try {
|
||||
reconcile();
|
||||
dataset.setLastDirScannerFinishTime(System.currentTimeMillis());
|
||||
} catch (Exception e) {
|
||||
// Log and continue - allows Executor to run again next cycle
|
||||
LOG.error(
|
||||
|
@ -692,4 +692,10 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
|
||||
* Get the volume list.
|
||||
*/
|
||||
List<FsVolumeImpl> getVolumeList();
|
||||
|
||||
/**
|
||||
* Set the last time in milliseconds when the directory scanner successfully ran.
|
||||
* @param time the last time in milliseconds when the directory scanner successfully ran.
|
||||
*/
|
||||
default void setLastDirScannerFinishTime(long time) {}
|
||||
}
|
||||
|
@ -284,7 +284,8 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
|
||||
private long maxDirScannerNotifyCount;
|
||||
private long curDirScannerNotifyCount;
|
||||
private long lastDirScannerNotifyTime;
|
||||
|
||||
private volatile long lastDirScannerFinishTime;
|
||||
|
||||
/**
|
||||
* An FSDataset has a directory where it loads its data files.
|
||||
*/
|
||||
@ -3811,5 +3812,15 @@ void stopAllDataxceiverThreads(FsVolumeImpl volume) {
|
||||
public List<FsVolumeImpl> getVolumeList() {
|
||||
return volumes.getVolumes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastDirScannerFinishTime() {
|
||||
return this.lastDirScannerFinishTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLastDirScannerFinishTime(long time) {
|
||||
this.lastDirScannerFinishTime = time;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,10 +69,11 @@ public static void getMetrics(MetricsCollector collector,
|
||||
" of blocks cached"), beanClass.getNumBlocksCached())
|
||||
.addGauge(Interns.info("NumBlocksFailedToCache", "Datanode number of " +
|
||||
"blocks failed to cache"), beanClass.getNumBlocksFailedToCache())
|
||||
.addGauge(Interns.info("NumBlocksFailedToUnCache", "Datanode number of" +
|
||||
.addGauge(Interns.info("NumBlocksFailedToUnCache", "Datanode number of" +
|
||||
" blocks failed in cache eviction"),
|
||||
beanClass.getNumBlocksFailedToUncache());
|
||||
|
||||
beanClass.getNumBlocksFailedToUncache())
|
||||
.addGauge(Interns.info("LastDirectoryScannerFinishTime",
|
||||
"Finish time of the last directory scan"), beanClass.getLastDirScannerFinishTime());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -122,4 +122,9 @@ public interface FSDatasetMBean extends MetricsSource {
|
||||
* Returns the number of blocks that the datanode was unable to uncache
|
||||
*/
|
||||
public long getNumBlocksFailedToUncache();
|
||||
|
||||
/**
|
||||
* Returns the last time in milliseconds when the directory scanner successfully ran.
|
||||
*/
|
||||
long getLastDirScannerFinishTime();
|
||||
}
|
||||
|
@ -939,6 +939,11 @@ public long getNumBlocksFailedToUncache() {
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastDirScannerFinishTime() {
|
||||
return 0L;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get metrics from the metrics source
|
||||
*
|
||||
@ -1632,5 +1637,10 @@ public MountVolumeMap getMountVolumeMap() {
|
||||
public List<FsVolumeImpl> getVolumeList() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLastDirScannerFinishTime(long time) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
@ -1304,6 +1305,24 @@ public void testLocalReplicaUpdateWithReplica() throws Exception {
|
||||
assertEquals(realBlkFile, localReplica.getBlockFile());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testLastDirScannerFinishTimeIsUpdated() throws Exception {
|
||||
Configuration conf = getConfiguration();
|
||||
conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 3L);
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
final DataNode dn = cluster.getDataNodes().get(0);
|
||||
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
||||
long lastDirScannerFinishTime = fds.getLastDirScannerFinishTime();
|
||||
dn.getDirectoryScanner().run();
|
||||
assertNotEquals(lastDirScannerFinishTime, fds.getLastDirScannerFinishTime());
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public long getRandomBlockId() {
|
||||
return Math.abs(new Random().nextLong());
|
||||
}
|
||||
|
@ -477,4 +477,10 @@ public MountVolumeMap getMountVolumeMap() {
|
||||
public List<FsVolumeImpl> getVolumeList() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastDirScannerFinishTime() {
|
||||
return 0L;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -238,7 +238,6 @@ public void setUp() throws IOException {
|
||||
for (String bpid : BLOCK_POOL_IDS) {
|
||||
dataset.addBlockPool(bpid, conf);
|
||||
}
|
||||
|
||||
assertEquals(NUM_INIT_VOLUMES, getNumVolumes());
|
||||
assertEquals(0, dataset.getNumFailedVolumes());
|
||||
}
|
||||
@ -250,6 +249,13 @@ public void checkDataSetLockManager() {
|
||||
assertNull(manager.getLastException());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetLastDirScannerFinishTime() throws IOException {
|
||||
assertEquals(dataset.getLastDirScannerFinishTime(), 0L);
|
||||
dataset.setLastDirScannerFinishTime(System.currentTimeMillis());
|
||||
assertNotEquals(0L, dataset.getLastDirScannerFinishTime());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddVolumes() throws IOException {
|
||||
final int numNewVolumes = 3;
|
||||
|
Loading…
Reference in New Issue
Block a user