HDFS-12453. TestDataNodeHotSwapVolumes fails in trunk Jenkins runs. (Lei (Eddy) Xu)

This commit is contained in:
Lei Xu 2017-09-29 10:46:17 -07:00
parent 7f6118f918
commit f40dbc170e

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockMissingException; import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -36,6 +37,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.After; import org.junit.After;
@ -68,6 +72,7 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -83,6 +88,7 @@ import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -775,12 +781,11 @@ public class TestDataNodeHotSwapVolumes {
private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx) private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
throws IOException, ReconfigurationException, TimeoutException, throws IOException, ReconfigurationException, TimeoutException,
InterruptedException, BrokenBarrierException { InterruptedException, BrokenBarrierException {
// Starts DFS cluster with 3 DataNodes to form a pipeline. startDFSCluster(1, 4);
startDFSCluster(1, 3);
final short REPLICATION = 3; final short REPLICATION = 3;
final DataNode dn = cluster.getDataNodes().get(dataNodeIdx); final DistributedFileSystem fs = cluster.getFileSystem();
final FileSystem fs = cluster.getFileSystem(); final DFSClient client = fs.getClient();
final Path testFile = new Path("/test"); final Path testFile = new Path("/test");
FSDataOutputStream out = fs.create(testFile, REPLICATION); FSDataOutputStream out = fs.create(testFile, REPLICATION);
@ -790,54 +795,93 @@ public class TestDataNodeHotSwapVolumes {
out.write(writeBuf); out.write(writeBuf);
out.hflush(); out.hflush();
// Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the BlockLocation[] blocks = fs.getFileBlockLocations(testFile, 0, BLOCK_SIZE);
// BlockReceiver releases volume reference before finalizeBlock(), the blocks String[] dataNodeNames = blocks[0].getNames();
// on the volume will be removed, and finalizeBlock() throws IOE. String dataNodeName = dataNodeNames[dataNodeIdx];
final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data; int xferPort = Integer.parseInt(dataNodeName.split(":")[1]);
dn.data = Mockito.spy(data); DataNode dn = null;
doAnswer(new Answer<Object>() { for (DataNode dataNode : cluster.getDataNodes()) {
public Object answer(InvocationOnMock invocation) if (dataNode.getXferPort() == xferPort) {
throws IOException, InterruptedException { dn = dataNode;
Thread.sleep(1000); break;
// Bypass the argument to FsDatasetImpl#finalizeBlock to verify that }
// the block is not removed, since the volume reference should not }
// be released at this point. assertNotNull(dn);
data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0],
(boolean) invocation.getArguments()[1]);
return null;
}
}).when(dn.data).finalizeBlock(any(ExtendedBlock.class),
Mockito.anyBoolean());
final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(4);
final AtomicBoolean done = new AtomicBoolean(false);
List<String> oldDirs = getDataDirs(dn); DataNodeFaultInjector newInjector = new DataNodeFaultInjector() {
final String newDirs = oldDirs.get(1); // Remove the first volume. public void logDelaySendingAckToUpstream(
final List<Exception> exceptions = new ArrayList<>(); final String upstreamAddr, final long delayMs) throws IOException {
Thread reconfigThread = new Thread() {
public void run() {
try { try {
barrier.await(); // Make all streams which hold the volume references to wait the
assertThat( // reconfiguration thread to start.
"DN did not update its own config", // It should only block IO during the period of reconfiguration
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs), // task running.
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); if (!done.get()) {
} catch (ReconfigurationException | barrier.await();
InterruptedException | // Add delays to allow the reconfiguration thread starts before
BrokenBarrierException e) { // IO finish.
exceptions.add(e); Thread.sleep(1000);
}
} catch (InterruptedException | BrokenBarrierException e) {
throw new IOException(e);
} }
} }
}; };
reconfigThread.start(); DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
barrier.await(); try {
rb.nextBytes(writeBuf); DataNodeFaultInjector.set(newInjector);
out.write(writeBuf);
out.hflush();
out.close();
reconfigThread.join(); List<String> oldDirs = getDataDirs(dn);
LocatedBlocks lbs = client.getLocatedBlocks("/test", 0);
LocatedBlock block = lbs.get(0);
FsVolumeImpl volume =
(FsVolumeImpl) dn.getFSDataset().getVolume(block.getBlock());
final String newDirs = oldDirs.stream()
.filter((d) -> !d.contains(volume.getStorageLocation().toString()))
.collect(Collectors.joining(","));
final List<IOException> exceptions = new ArrayList<>();
final DataNode dataNode = dn;
final CyclicBarrier reconfigBarrier = new CyclicBarrier(2);
Thread reconfigThread = new Thread(() -> {
try {
reconfigBarrier.await();
// Wake up writing threads on the pipeline to finish the block.
barrier.await();
assertThat(
"DN did not update its own config",
dataNode.reconfigurePropertyImpl(
DFS_DATANODE_DATA_DIR_KEY, newDirs),
is(dataNode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
done.set(true);
} catch (ReconfigurationException |
InterruptedException |
BrokenBarrierException e) {
exceptions.add(new IOException(e));
}
});
reconfigThread.start();
// Write more data to make sure the stream threads wait on the barrier.
rb.nextBytes(writeBuf);
out.write(writeBuf);
reconfigBarrier.await();
out.hflush();
out.close();
reconfigThread.join();
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
} finally {
DataNodeFaultInjector.set(oldInjector);
}
// Verify if the data directory reconfigure was successful // Verify if the data directory reconfigure was successful
FsDatasetSpi<? extends FsVolumeSpi> fsDatasetSpi = dn.getFSDataset(); FsDatasetSpi<? extends FsVolumeSpi> fsDatasetSpi = dn.getFSDataset();
@ -851,19 +895,12 @@ public class TestDataNodeHotSwapVolumes {
1, fsVolumeReferences.size()); 1, fsVolumeReferences.size());
} }
// Add a new DataNode to help with the pipeline recover.
cluster.startDataNodes(conf, 1, true, null, null, null);
// Verify the file has sufficient replications. // Verify the file has sufficient replications.
DFSTestUtil.waitReplication(fs, testFile, REPLICATION); DFSTestUtil.waitReplication(fs, testFile, REPLICATION);
// Read the content back // Read the content back
byte[] content = DFSTestUtil.readFileBuffer(fs, testFile); byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
assertEquals(BLOCK_SIZE, content.length); assertEquals(BLOCK_SIZE, content.length);
if (!exceptions.isEmpty()) {
throw new IOException(exceptions.get(0).getCause());
}
// Write more files to make sure that the DataNode that has removed volume // Write more files to make sure that the DataNode that has removed volume
// is still alive to receive data. // is still alive to receive data.
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {