HDFS-16213. Flaky test TestFsDatasetImpl#testDnRestartWithHardLink (#3386)
Reviewed-by: Fei Hui <feihui.ustc@gmail.com> Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org> Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
parent
4df4389325
commit
feee41aa00
@ -513,6 +513,7 @@ public boolean fullyDelete(@Nullable FsVolumeSpi volume, File dir) {
|
||||
try {
|
||||
faultInjectorEventHook.beforeMetadataOp(volume, DELETE);
|
||||
boolean deleted = FileUtil.fullyDelete(dir);
|
||||
LOG.trace("Deletion of dir {} {}", dir, deleted ? "succeeded" : "failed");
|
||||
profilingEventHook.afterMetadataOp(volume, DELETE, begin);
|
||||
return deleted;
|
||||
} catch(Exception e) {
|
||||
|
@ -102,7 +102,12 @@ class BlockPoolSlice {
|
||||
private final Runnable shutdownHook;
|
||||
private volatile boolean dfsUsedSaved = false;
|
||||
private static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
||||
private final boolean deleteDuplicateReplicas;
|
||||
|
||||
/**
|
||||
* Only tests are allowed to modify the value. For source code,
|
||||
* this should be treated as final only.
|
||||
*/
|
||||
private boolean deleteDuplicateReplicas;
|
||||
private static final String REPLICA_CACHE_FILE = "replicas";
|
||||
private final long replicaCacheExpiry;
|
||||
private final File replicaCacheDir;
|
||||
@ -440,7 +445,7 @@ void getVolumeMap(ReplicaMap volumeMap,
|
||||
"Recovered " + numRecovered + " replicas from " + lazypersistDir);
|
||||
}
|
||||
|
||||
boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
|
||||
boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
|
||||
if (!success) {
|
||||
List<IOException> exceptions = Collections
|
||||
.synchronizedList(new ArrayList<IOException>());
|
||||
@ -1081,4 +1086,11 @@ public static void reInitializeAddReplicaThreadPool() {
|
||||
addReplicaThreadPool.shutdown();
|
||||
addReplicaThreadPool = null;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setDeleteDuplicateReplicasForTests(
|
||||
boolean deleteDuplicateReplicasForTests) {
|
||||
this.deleteDuplicateReplicas = deleteDuplicateReplicasForTests;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -26,6 +26,7 @@
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
@ -1049,12 +1050,17 @@ static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta,
|
||||
static File[] hardLinkBlockFiles(ReplicaInfo srcReplica, File dstMeta,
|
||||
File dstFile)
|
||||
throws IOException {
|
||||
FsVolumeSpi srcReplicaVolume = srcReplica.getVolume();
|
||||
File destParentFile = dstFile.getParentFile();
|
||||
// Create parent folder if not exists.
|
||||
srcReplica.getFileIoProvider()
|
||||
.mkdirs(srcReplica.getVolume(), dstFile.getParentFile());
|
||||
boolean isDirCreated = srcReplica.getFileIoProvider()
|
||||
.mkdirs(srcReplicaVolume, destParentFile);
|
||||
LOG.trace("Dir creation of {} on volume {} {}", destParentFile,
|
||||
srcReplicaVolume, isDirCreated ? "succeeded" : "failed");
|
||||
URI srcReplicaUri = srcReplica.getBlockURI();
|
||||
try {
|
||||
HardLink.createHardLink(
|
||||
new File(srcReplica.getBlockURI()), dstFile);
|
||||
new File(srcReplicaUri), dstFile);
|
||||
} catch (IOException e) {
|
||||
throw new IOException("Failed to hardLink "
|
||||
+ srcReplica + " block file to "
|
||||
@ -1068,9 +1074,8 @@ static File[] hardLinkBlockFiles(ReplicaInfo srcReplica, File dstMeta,
|
||||
+ srcReplica + " metadata to "
|
||||
+ dstMeta, e);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info("Linked " + srcReplica.getBlockURI() + " to " + dstFile);
|
||||
}
|
||||
LOG.debug("Linked {} to {} . Dest meta file: {}", srcReplicaUri, dstFile,
|
||||
dstMeta);
|
||||
return new File[]{dstMeta, dstFile};
|
||||
}
|
||||
|
||||
|
@ -2483,12 +2483,8 @@ public boolean restartDataNode(DataNodeProperties dnprop) throws IOException {
|
||||
|
||||
public void waitDatanodeFullyStarted(DataNode dn, int timeout)
|
||||
throws TimeoutException, InterruptedException {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return dn.isDatanodeFullyStarted();
|
||||
}
|
||||
}, 100, timeout);
|
||||
GenericTestUtils.waitFor(dn::isDatanodeFullyStarted, 100, timeout,
|
||||
"Datanode is not started even after " + timeout + " ms of waiting");
|
||||
}
|
||||
|
||||
private void waitDataNodeFullyStarted(final DataNode dn)
|
||||
|
@ -50,10 +50,8 @@
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DNConf;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
@ -78,7 +76,9 @@
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
@ -124,7 +124,10 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class TestFsDatasetImpl {
|
||||
Logger LOG = LoggerFactory.getLogger(TestFsDatasetImpl.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
TestFsDatasetImpl.class);
|
||||
|
||||
private static final String BASE_DIR =
|
||||
new FileSystemTestHelper().getTestRootDir();
|
||||
private String replicaCacheRootDir = BASE_DIR + Path.SEPARATOR + "cache";
|
||||
@ -132,10 +135,6 @@ public class TestFsDatasetImpl {
|
||||
private static final String CLUSTER_ID = "cluser-id";
|
||||
private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
|
||||
|
||||
// Use to generate storageUuid
|
||||
private static final DataStorage dsForStorageUuid = new DataStorage(
|
||||
new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE));
|
||||
|
||||
private Configuration conf;
|
||||
private DataNode datanode;
|
||||
private DataStorage storage;
|
||||
@ -143,6 +142,9 @@ public class TestFsDatasetImpl {
|
||||
|
||||
private final static String BLOCKPOOL = "BP-TEST";
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
private static Storage.StorageDirectory createStorageDirectory(File root,
|
||||
Configuration conf)
|
||||
throws SecurityException, IOException {
|
||||
@ -228,6 +230,7 @@ public void setUp() throws IOException {
|
||||
assertEquals(NUM_INIT_VOLUMES, getNumVolumes());
|
||||
assertEquals(0, dataset.getNumFailedVolumes());
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testReadLockEnabledByDefault()
|
||||
throws Exception {
|
||||
@ -1132,7 +1135,7 @@ public void testReportBadBlocks() throws Exception {
|
||||
Assert.assertEquals(0, cluster.getNamesystem().getCorruptReplicaBlocks());
|
||||
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
Path filePath = new Path("testData");
|
||||
Path filePath = new Path(name.getMethodName());
|
||||
DFSTestUtil.createFile(fs, filePath, 1, (short) 1, 0);
|
||||
|
||||
block = DFSTestUtil.getFirstBlock(fs, filePath);
|
||||
@ -1179,7 +1182,7 @@ private void testMoveBlockFailure(Configuration config) {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
|
||||
Path filePath = new Path("testData");
|
||||
Path filePath = new Path(name.getMethodName());
|
||||
long fileLen = 100;
|
||||
ExtendedBlock block = createTestFile(fs, fileLen, filePath);
|
||||
|
||||
@ -1227,7 +1230,7 @@ public void testMoveBlockSuccess() {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
|
||||
Path filePath = new Path("testData");
|
||||
Path filePath = new Path(name.getMethodName());
|
||||
DFSTestUtil.createFile(fs, filePath, 100, (short) 1, 0);
|
||||
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
|
||||
|
||||
@ -1266,7 +1269,7 @@ public void testDnRestartWithHardLinkInTmp() {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
|
||||
Path filePath = new Path("testData");
|
||||
Path filePath = new Path(name.getMethodName());
|
||||
long fileLen = 100;
|
||||
|
||||
ExtendedBlock block = createTestFile(fs, fileLen, filePath);
|
||||
@ -1307,23 +1310,34 @@ public void testDnRestartWithHardLinkInTmp() {
|
||||
* DiskScanner should clean up the hardlink correctly.
|
||||
*/
|
||||
@Test(timeout = 30000)
|
||||
public void testDnRestartWithHardLink() {
|
||||
public void testDnRestartWithHardLink() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
boolean isReplicaDeletionEnabled =
|
||||
conf.getBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
|
||||
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);
|
||||
try {
|
||||
conf.setBoolean(DFSConfigKeys
|
||||
.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
|
||||
// Since Datanode restart in the middle of block movement may leave
|
||||
// uncleaned hardlink, disabling this config (i.e. deletion of duplicate
|
||||
// replica) will prevent this edge-case from happening.
|
||||
// We also re-enable deletion of duplicate replica just before starting
|
||||
// Dir Scanner using setDeleteDuplicateReplicasForTests (HDFS-16213).
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
|
||||
false);
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1)
|
||||
.storageTypes(
|
||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
|
||||
.storagesPerDatanode(2)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
|
||||
Path filePath = new Path("testData");
|
||||
Path filePath = new Path(name.getMethodName());
|
||||
long fileLen = 100;
|
||||
|
||||
ExtendedBlock block = createTestFile(fs, fileLen, filePath);
|
||||
@ -1331,11 +1345,15 @@ public void testDnRestartWithHardLink() {
|
||||
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
|
||||
|
||||
final ReplicaInfo oldReplicaInfo = fsDataSetImpl.getReplicaInfo(block);
|
||||
StorageType oldStorageType = oldReplicaInfo.getVolume().getStorageType();
|
||||
|
||||
fsDataSetImpl.finalizeNewReplica(
|
||||
createNewReplicaObjWithLink(block, fsDataSetImpl), block);
|
||||
|
||||
ReplicaInfo newReplicaInfo = fsDataSetImpl.getReplicaInfo(block);
|
||||
StorageType newStorageType = newReplicaInfo.getVolume().getStorageType();
|
||||
assertEquals(StorageType.DISK, oldStorageType);
|
||||
assertEquals(StorageType.ARCHIVE, newStorageType);
|
||||
|
||||
cluster.restartDataNode(0);
|
||||
cluster.waitDatanodeFullyStarted(cluster.getDataNodes().get(0), 60000);
|
||||
@ -1344,26 +1362,35 @@ public void testDnRestartWithHardLink() {
|
||||
assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
|
||||
assertTrue(Files.exists(Paths.get(oldReplicaInfo.getBlockURI())));
|
||||
|
||||
DirectoryScanner scanner = new DirectoryScanner(
|
||||
cluster.getDataNodes().get(0).getFSDataset(), conf);
|
||||
// Before starting Dir Scanner, we should enable deleteDuplicateReplicas.
|
||||
FsDatasetSpi<?> fsDataset = cluster.getDataNodes().get(0).getFSDataset();
|
||||
DirectoryScanner scanner = new DirectoryScanner(fsDataset, conf);
|
||||
FsVolumeImpl fsVolume =
|
||||
(FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);
|
||||
fsVolume.getBlockPoolSlice(fsVolume.getBlockPoolList()[0])
|
||||
.setDeleteDuplicateReplicasForTests(true);
|
||||
scanner.start();
|
||||
scanner.run();
|
||||
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override public Boolean get() {
|
||||
return !Files.exists(Paths.get(oldReplicaInfo.getBlockURI()));
|
||||
}
|
||||
}, 100, 10000);
|
||||
GenericTestUtils.waitFor(
|
||||
() -> !Files.exists(Paths.get(oldReplicaInfo.getBlockURI())),
|
||||
100, 10000, "Old replica is not deleted by DirScanner even after "
|
||||
+ "10s of waiting has elapsed");
|
||||
assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
|
||||
|
||||
validateFileLen(fs, fileLen, filePath);
|
||||
|
||||
} catch (Exception ex) {
|
||||
LOG.info("Exception in testDnRestartWithHardLink ", ex);
|
||||
fail("Exception while testing testDnRestartWithHardLink ");
|
||||
// Additional tests to ensure latest replica gets deleted after file
|
||||
// deletion.
|
||||
fs.delete(filePath, false);
|
||||
GenericTestUtils.waitFor(
|
||||
() -> !Files.exists(Paths.get(newReplicaInfo.getBlockURI())),
|
||||
100, 10000);
|
||||
} finally {
|
||||
if (cluster.isClusterUp()) {
|
||||
cluster.shutdown();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
|
||||
isReplicaDeletionEnabled);
|
||||
if (cluster != null && cluster.isClusterUp()) {
|
||||
cluster.shutdown(true, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1384,7 +1411,7 @@ public void testMoveBlockSuccessWithSameMountMove() {
|
||||
.build();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
Path filePath = new Path("testData");
|
||||
Path filePath = new Path(name.getMethodName());
|
||||
long fileLen = 100;
|
||||
|
||||
ExtendedBlock block = createTestFile(fs, fileLen, filePath);
|
||||
@ -1432,7 +1459,7 @@ public void testMoveBlockWithSameMountMoveWithoutSpace() {
|
||||
.build();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
Path filePath = new Path("testData");
|
||||
Path filePath = new Path(name.getMethodName());
|
||||
long fileLen = 100;
|
||||
|
||||
ExtendedBlock block = createTestFile(fs, fileLen, filePath);
|
||||
|
Loading…
Reference in New Issue
Block a user