HDFS-6877. Avoid calling checkDisk when an HDFS volume is removed during a write. (Lei Xu via Colin P. McCabe)

This commit is contained in:
Colin Patrick Mccabe 2014-10-22 13:38:26 -07:00
parent 66e8187ea1
commit 7b0f9bb258
5 changed files with 105 additions and 18 deletions

View File

@ -287,6 +287,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7254. Add documentation for hot swaping DataNode drives (Lei Xu via HDFS-7254. Add documentation for hot swaping DataNode drives (Lei Xu via
Colin P. McCabe) Colin P. McCabe)
HDFS-6877. Avoid calling checkDisk when an HDFS volume is removed during a
write. (Lei Xu via Colin P. McCabe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
@ -1229,7 +1230,28 @@ public void run() {
if (lastPacketInBlock) { if (lastPacketInBlock) {
// Finalize the block and close the block file // Finalize the block and close the block file
finalizeBlock(startTime); try {
finalizeBlock(startTime);
} catch (ReplicaNotFoundException e) {
// Verify that the exception is due to volume removal.
FsVolumeSpi volume;
synchronized (datanode.data) {
volume = datanode.data.getVolume(block);
}
if (volume == null) {
// ReplicaInfo has been removed due to the corresponding data
// volume has been removed. Don't need to check disk error.
LOG.info(myString
+ ": BlockReceiver is interrupted because the block pool "
+ block.getBlockPoolId() + " has been removed.", e);
sendAckUpstream(ack, expected, totalAckTimeNanos, 0,
Status.OOB_INTERRUPTED);
running = false;
receiverThread.interrupt();
continue;
}
throw e;
}
} }
sendAckUpstream(ack, expected, totalAckTimeNanos, sendAckUpstream(ack, expected, totalAckTimeNanos,

View File

@ -267,6 +267,9 @@ public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
* The block size is what is in the parameter b and it must match the amount * The block size is what is in the parameter b and it must match the amount
* of data written * of data written
* @throws IOException * @throws IOException
* @throws ReplicaNotFoundException if the replica can not be found when the
* block is been finalized. For instance, the block resides on an HDFS volume
* that has been removed.
*/ */
public void finalizeBlock(ExtendedBlock b) throws IOException; public void finalizeBlock(ExtendedBlock b) throws IOException;

View File

@ -219,7 +219,7 @@ enum Status {
CHECKSUM_OK = 6; CHECKSUM_OK = 6;
ERROR_UNSUPPORTED = 7; ERROR_UNSUPPORTED = 7;
OOB_RESTART = 8; // Quick restart OOB_RESTART = 8; // Quick restart
OOB_RESERVED1 = 9; // Reserved OOB_INTERRUPTED = 9; // Interrupted
OOB_RESERVED2 = 10; // Reserved OOB_RESERVED2 = 10; // Reserved
OOB_RESERVED3 = 11; // Reserved OOB_RESERVED3 = 11; // Reserved
IN_PROGRESS = 12; IN_PROGRESS = 12;

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -50,6 +51,7 @@
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -156,9 +158,12 @@ private static void waitReplication(FileSystem fs, Path file, int blockIdx,
throws IOException, TimeoutException, InterruptedException { throws IOException, TimeoutException, InterruptedException {
int attempts = 50; // Wait 5 seconds. int attempts = 50; // Wait 5 seconds.
while (attempts > 0) { while (attempts > 0) {
if (getNumReplicas(fs, file, blockIdx) == numReplicas) { int actualReplicas = getNumReplicas(fs, file, blockIdx);
if (actualReplicas == numReplicas) {
return; return;
} }
System.out.printf("Block %d of file %s has %d replicas (desired %d).\n",
blockIdx, file.toString(), actualReplicas, numReplicas);
Thread.sleep(100); Thread.sleep(100);
attempts--; attempts--;
} }
@ -167,9 +172,16 @@ private static void waitReplication(FileSystem fs, Path file, int blockIdx,
} }
/** Parses data dirs from DataNode's configuration. */ /** Parses data dirs from DataNode's configuration. */
private static Collection<String> getDataDirs(DataNode datanode) { private static List<String> getDataDirs(DataNode datanode) {
return datanode.getConf().getTrimmedStringCollection( return new ArrayList<String>(datanode.getConf().getTrimmedStringCollection(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
}
/** Force the DataNode to report missing blocks immediately. */
private static void triggerDeleteReport(DataNode datanode)
throws IOException {
datanode.scheduleAllBlockReport(0);
DataNodeTestUtils.triggerDeletionReport(datanode);
} }
@Test @Test
@ -274,7 +286,7 @@ private List<List<Integer>> getNumBlocksReport(int namesystemIdx) {
/** /**
* Test adding one volume on a running MiniDFSCluster with only one NameNode. * Test adding one volume on a running MiniDFSCluster with only one NameNode.
*/ */
@Test @Test(timeout=60000)
public void testAddOneNewVolume() public void testAddOneNewVolume()
throws IOException, ReconfigurationException, throws IOException, ReconfigurationException,
InterruptedException, TimeoutException { InterruptedException, TimeoutException {
@ -304,7 +316,7 @@ public void testAddOneNewVolume()
verifyFileLength(cluster.getFileSystem(), testFile, numBlocks); verifyFileLength(cluster.getFileSystem(), testFile, numBlocks);
} }
@Test(timeout = 60000) @Test(timeout=60000)
public void testAddVolumesDuringWrite() public void testAddVolumesDuringWrite()
throws IOException, InterruptedException, TimeoutException, throws IOException, InterruptedException, TimeoutException,
ReconfigurationException { ReconfigurationException {
@ -336,7 +348,7 @@ public void testAddVolumesDuringWrite()
assertEquals(expectedNumBlocks, actualNumBlocks); assertEquals(expectedNumBlocks, actualNumBlocks);
} }
@Test @Test(timeout=60000)
public void testAddVolumesToFederationNN() public void testAddVolumesToFederationNN()
throws IOException, TimeoutException, InterruptedException, throws IOException, TimeoutException, InterruptedException,
ReconfigurationException { ReconfigurationException {
@ -371,7 +383,7 @@ public void testAddVolumesToFederationNN()
Collections.frequency(actualNumBlocks.get(0), 0)); Collections.frequency(actualNumBlocks.get(0), 0));
} }
@Test @Test(timeout=60000)
public void testRemoveOneVolume() public void testRemoveOneVolume()
throws ReconfigurationException, InterruptedException, TimeoutException, throws ReconfigurationException, InterruptedException, TimeoutException,
IOException { IOException {
@ -410,12 +422,13 @@ public void testRemoveOneVolume()
assertEquals(10 / 2 + 6, blocksForVolume1.getNumberOfBlocks()); assertEquals(10 / 2 + 6, blocksForVolume1.getNumberOfBlocks());
} }
@Test @Test(timeout=60000)
public void testReplicatingAfterRemoveVolume() public void testReplicatingAfterRemoveVolume()
throws InterruptedException, TimeoutException, IOException, throws InterruptedException, TimeoutException, IOException,
ReconfigurationException { ReconfigurationException {
startDFSCluster(1, 2); startDFSCluster(1, 2);
final DistributedFileSystem fs = cluster.getFileSystem();
final FileSystem fs = cluster.getFileSystem();
final short replFactor = 2; final short replFactor = 2;
Path testFile = new Path("/test"); Path testFile = new Path("/test");
createFile(testFile, 4, replFactor); createFile(testFile, 4, replFactor);
@ -428,14 +441,9 @@ public void testReplicatingAfterRemoveVolume()
assertFileLocksReleased( assertFileLocksReleased(
new ArrayList<String>(oldDirs).subList(1, oldDirs.size())); new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
// Force DataNode to report missing blocks. triggerDeleteReport(dn);
dn.scheduleAllBlockReport(0);
DataNodeTestUtils.triggerDeletionReport(dn);
// The 2nd block only has 1 replica due to the removed data volume.
waitReplication(fs, testFile, 1, 1); waitReplication(fs, testFile, 1, 1);
// Wait NameNode to replica missing blocks.
DFSTestUtil.waitReplication(fs, testFile, replFactor); DFSTestUtil.waitReplication(fs, testFile, replFactor);
} }
@ -478,4 +486,55 @@ private static void assertFileLocksReleased(Collection<String> dirs)
} }
} }
} }
@Test(timeout=180000)
public void testRemoveVolumeBeingWritten()
throws InterruptedException, TimeoutException, ReconfigurationException,
IOException {
// test against removing volumes on the different DataNode on the pipeline.
for (int i = 0; i < 3; i++) {
testRemoveVolumeBeingWrittenForDatanode(i);
}
}
/**
* Test the case that remove a data volume on a particular DataNode when the
* volume is actively being written.
* @param dataNodeIdx the index of the DataNode to remove a volume.
*/
private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
throws IOException, ReconfigurationException, TimeoutException,
InterruptedException {
// Starts DFS cluster with 3 DataNodes to form a pipeline.
startDFSCluster(1, 3);
final short REPLICATION = 3;
final DataNode dn = cluster.getDataNodes().get(dataNodeIdx);
final FileSystem fs = cluster.getFileSystem();
final Path testFile = new Path("/test");
FSDataOutputStream out = fs.create(testFile, REPLICATION);
Random rb = new Random(0);
byte[] writeBuf = new byte[BLOCK_SIZE / 2]; // half of the block.
rb.nextBytes(writeBuf);
out.write(writeBuf);
out.hflush();
List<String> oldDirs = getDataDirs(dn);
String newDirs = oldDirs.get(1); // Remove the first volume.
dn.reconfigurePropertyImpl(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
rb.nextBytes(writeBuf);
out.write(writeBuf);
out.hflush();
out.close();
// Verify the file has sufficient replications.
DFSTestUtil.waitReplication(fs, testFile, REPLICATION);
// Read the content back
byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
assertEquals(BLOCK_SIZE, content.length);
}
} }