HDFS-3541. Deadlock between recovery, xceiver and packet responder. Contributed by Vinay.

Submitted by:	Vinay
Reviewed by:	Uma Maheswara Rao G


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1358794 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uma Maheswara Rao G 2012-07-08 17:42:19 +00:00
parent 52a4678a1d
commit d4fb882163
4 changed files with 78 additions and 0 deletions

View File

@ -175,6 +175,8 @@ Trunk (unreleased changes)
HDFS-3549. Fix dist tar build fails in hadoop-hdfs-raid project. (Jason Lowe via daryn)
HDFS-3541. Deadlock between recovery, xceiver and packet responder (Vinay via umamahesh)
Branch-2 ( Unreleased changes )
INCOMPATIBLE CHANGES

View File

@ -844,6 +844,7 @@ class BlockReceiver implements Closeable {
try {
responder.join();
} catch (InterruptedException e) {
responder.interrupt();
throw new IOException("Interrupted receiveBlock");
}
responder = null;
@ -1018,6 +1019,7 @@ class BlockReceiver implements Closeable {
wait();
} catch (InterruptedException e) {
running = false;
Thread.currentThread().interrupt();
}
}
if(LOG.isDebugEnabled()) {

View File

@ -838,6 +838,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
@Override // FsDatasetSpi
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
}
ReplicaInfo replicaInfo = getReplicaInfo(b);
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
// this is legal, when recovery happens on a file that has

View File

@ -38,21 +38,27 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -561,4 +567,68 @@ public class TestBlockRecovery {
streams.close();
}
}
/**
* Test to verify the race between finalizeBlock and Lease recovery
*
* @throws Exception
*/
@Test(timeout = 20000)
public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() throws Exception {
tearDown();// Stop the Mocked DN started in startup()
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleSingleNN(8020, 50070))
.numDataNodes(1).build();
try {
cluster.waitClusterUp();
DistributedFileSystem fs = cluster.getFileSystem();
Path path = new Path("/test");
FSDataOutputStream out = fs.create(path);
out.writeBytes("data");
out.hsync();
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path));
final LocatedBlock block = blocks.get(0);
final DataNode dataNode = cluster.getDataNodes().get(0);
final AtomicBoolean recoveryInitResult = new AtomicBoolean(true);
Thread recoveryThread = new Thread() {
public void run() {
try {
DatanodeInfo[] locations = block.getLocations();
final RecoveringBlock recoveringBlock = new RecoveringBlock(
block.getBlock(), locations, block.getBlock()
.getGenerationStamp() + 1);
synchronized (dataNode.data) {
Thread.sleep(2000);
dataNode.initReplicaRecovery(recoveringBlock);
}
} catch (Exception e) {
recoveryInitResult.set(false);
}
}
};
recoveryThread.start();
try {
out.close();
} catch (IOException e) {
Assert.assertTrue("Writing should fail",
e.getMessage().contains("are bad. Aborting..."));
} finally {
recoveryThread.join();
}
Assert.assertTrue("Recovery should be initiated successfully",
recoveryInitResult.get());
dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
.getGenerationStamp() + 1, block.getBlockSize());
} finally {
if (null != cluster) {
cluster.shutdown();
cluster = null;
}
}
}
}