HDFS-5016. Deadlock in pipeline recovery causes Datanode to be marked dead. Contributed by Suresh Srinivas.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1507189 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1b531c1dbb
commit
7723b139d5
@ -907,4 +907,16 @@ public static byte[] getUuidBytes() {
|
|||||||
buf.putLong(uuid.getLeastSignificantBits());
|
buf.putLong(uuid.getLeastSignificantBits());
|
||||||
return buf.array();
|
return buf.array();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get stack trace for a given thread.
|
||||||
|
*/
|
||||||
|
public static String getStackTrace(Thread t) {
|
||||||
|
final StackTraceElement[] stackTrace = t.getStackTrace();
|
||||||
|
StringBuilder str = new StringBuilder();
|
||||||
|
for (StackTraceElement e : stackTrace) {
|
||||||
|
str.append(e.toString() + "\n");
|
||||||
|
}
|
||||||
|
return str.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -713,6 +713,9 @@ Release 2.1.0-beta - 2013-07-02
|
|||||||
|
|
||||||
HDFS-4602. TestBookKeeperHACheckpoints fails. (umamahesh)
|
HDFS-4602. TestBookKeeperHACheckpoints fails. (umamahesh)
|
||||||
|
|
||||||
|
HDFS-5016. Deadlock in pipeline recovery causes Datanode to be marked dead.
|
||||||
|
(suresh)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.
|
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.
|
||||||
|
@ -495,4 +495,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes
|
public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes
|
||||||
public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
|
public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
|
||||||
public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
|
public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
|
||||||
|
|
||||||
|
// Hidden configuration undocumented in hdfs-site. xml
|
||||||
|
// Timeout to wait for block receiver and responder thread to stop
|
||||||
|
public static final String DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY = "dfs.datanode.xceiver.stop.timeout.millis";
|
||||||
|
public static final long DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT = 60000;
|
||||||
}
|
}
|
||||||
|
@ -51,6 +51,7 @@
|
|||||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
@ -728,7 +729,13 @@ void receiveBlock(
|
|||||||
}
|
}
|
||||||
if (responder != null) {
|
if (responder != null) {
|
||||||
try {
|
try {
|
||||||
responder.join();
|
responder.join(datanode.getDnConf().getXceiverStopTimeout());
|
||||||
|
if (responder.isAlive()) {
|
||||||
|
String msg = "Join on responder thread " + responder
|
||||||
|
+ " timed out";
|
||||||
|
LOG.warn(msg + "\n" + StringUtils.getStackTrace(responder));
|
||||||
|
throw new IOException(msg);
|
||||||
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
responder.interrupt();
|
responder.interrupt();
|
||||||
throw new IOException("Interrupted receiveBlock");
|
throw new IOException("Interrupted receiveBlock");
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
||||||
@ -29,6 +30,8 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY;
|
||||||
@ -44,7 +47,8 @@
|
|||||||
* Simple class encapsulating all of the configuration that the DataNode
|
* Simple class encapsulating all of the configuration that the DataNode
|
||||||
* loads at startup time.
|
* loads at startup time.
|
||||||
*/
|
*/
|
||||||
class DNConf {
|
@InterfaceAudience.Private
|
||||||
|
public class DNConf {
|
||||||
final int socketTimeout;
|
final int socketTimeout;
|
||||||
final int socketWriteTimeout;
|
final int socketWriteTimeout;
|
||||||
final int socketKeepaliveTimeout;
|
final int socketKeepaliveTimeout;
|
||||||
@ -67,6 +71,8 @@ class DNConf {
|
|||||||
final String minimumNameNodeVersion;
|
final String minimumNameNodeVersion;
|
||||||
final String encryptionAlgorithm;
|
final String encryptionAlgorithm;
|
||||||
|
|
||||||
|
final long xceiverStopTimeout;
|
||||||
|
|
||||||
public DNConf(Configuration conf) {
|
public DNConf(Configuration conf) {
|
||||||
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
||||||
HdfsServerConstants.READ_TIMEOUT);
|
HdfsServerConstants.READ_TIMEOUT);
|
||||||
@ -127,10 +133,18 @@ public DNConf(Configuration conf) {
|
|||||||
this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY,
|
this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY,
|
||||||
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
|
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
|
||||||
this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
|
this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
|
||||||
|
|
||||||
|
this.xceiverStopTimeout = conf.getLong(
|
||||||
|
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
|
||||||
|
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
|
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
|
||||||
String getMinimumNameNodeVersion() {
|
String getMinimumNameNodeVersion() {
|
||||||
return this.minimumNameNodeVersion;
|
return this.minimumNameNodeVersion;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getXceiverStopTimeout() {
|
||||||
|
return xceiverStopTimeout;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -2448,7 +2448,7 @@ public Long getBalancerBandwidth() {
|
|||||||
return dxcs.balanceThrottler.getBandwidth();
|
return dxcs.balanceThrottler.getBandwidth();
|
||||||
}
|
}
|
||||||
|
|
||||||
DNConf getDnConf() {
|
public DNConf getDnConf() {
|
||||||
return dnConf;
|
return dnConf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class defines a replica in a pipeline, which
|
* This class defines a replica in a pipeline, which
|
||||||
@ -150,11 +151,16 @@ public boolean equals(Object o) {
|
|||||||
* Interrupt the writing thread and wait until it dies
|
* Interrupt the writing thread and wait until it dies
|
||||||
* @throws IOException the waiting is interrupted
|
* @throws IOException the waiting is interrupted
|
||||||
*/
|
*/
|
||||||
public void stopWriter() throws IOException {
|
public void stopWriter(long xceiverStopTimeout) throws IOException {
|
||||||
if (writer != null && writer != Thread.currentThread() && writer.isAlive()) {
|
if (writer != null && writer != Thread.currentThread() && writer.isAlive()) {
|
||||||
writer.interrupt();
|
writer.interrupt();
|
||||||
try {
|
try {
|
||||||
writer.join();
|
writer.join(xceiverStopTimeout);
|
||||||
|
if (writer.isAlive()) {
|
||||||
|
final String msg = "Join on writer thread " + writer + " timed out";
|
||||||
|
DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(writer));
|
||||||
|
throw new IOException(msg);
|
||||||
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new IOException("Waiting for writer thread is interrupted.");
|
throw new IOException("Waiting for writer thread is interrupted.");
|
||||||
}
|
}
|
||||||
|
@ -76,7 +76,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
import org.apache.hadoop.metrics2.util.MBeans;
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
@ -615,7 +614,7 @@ private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS,
|
|||||||
if (replicaInfo.getState() == ReplicaState.RBW) {
|
if (replicaInfo.getState() == ReplicaState.RBW) {
|
||||||
ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
|
ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
|
||||||
// kill the previous writer
|
// kill the previous writer
|
||||||
rbw.stopWriter();
|
rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
|
||||||
rbw.setWriter(Thread.currentThread());
|
rbw.setWriter(Thread.currentThread());
|
||||||
// check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
|
// check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
|
||||||
if (replicaLen != rbw.getBytesOnDisk()
|
if (replicaLen != rbw.getBytesOnDisk()
|
||||||
@ -735,7 +734,7 @@ public synchronized ReplicaInPipeline recoverRbw(ExtendedBlock b,
|
|||||||
LOG.info("Recovering " + rbw);
|
LOG.info("Recovering " + rbw);
|
||||||
|
|
||||||
// Stop the previous writer
|
// Stop the previous writer
|
||||||
rbw.stopWriter();
|
rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
|
||||||
rbw.setWriter(Thread.currentThread());
|
rbw.setWriter(Thread.currentThread());
|
||||||
|
|
||||||
// check generation stamp
|
// check generation stamp
|
||||||
@ -1451,13 +1450,14 @@ public synchronized String getReplicaString(String bpid, long blockId) {
|
|||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public synchronized ReplicaRecoveryInfo initReplicaRecovery(
|
public synchronized ReplicaRecoveryInfo initReplicaRecovery(
|
||||||
RecoveringBlock rBlock) throws IOException {
|
RecoveringBlock rBlock) throws IOException {
|
||||||
return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(),
|
return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), volumeMap,
|
||||||
volumeMap, rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp());
|
rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp(),
|
||||||
|
datanode.getDnConf().getXceiverStopTimeout());
|
||||||
}
|
}
|
||||||
|
|
||||||
/** static version of {@link #initReplicaRecovery(Block, long)}. */
|
/** static version of {@link #initReplicaRecovery(Block, long)}. */
|
||||||
static ReplicaRecoveryInfo initReplicaRecovery(String bpid,
|
static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
|
||||||
ReplicaMap map, Block block, long recoveryId) throws IOException {
|
Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
|
||||||
final ReplicaInfo replica = map.get(bpid, block.getBlockId());
|
final ReplicaInfo replica = map.get(bpid, block.getBlockId());
|
||||||
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
|
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
|
||||||
+ ", replica=" + replica);
|
+ ", replica=" + replica);
|
||||||
@ -1470,7 +1470,7 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid,
|
|||||||
//stop writer if there is any
|
//stop writer if there is any
|
||||||
if (replica instanceof ReplicaInPipeline) {
|
if (replica instanceof ReplicaInPipeline) {
|
||||||
final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
|
final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
|
||||||
rip.stopWriter();
|
rip.stopWriter(xceiverStopTimeout);
|
||||||
|
|
||||||
//check replica bytes on disk.
|
//check replica bytes on disk.
|
||||||
if (rip.getBytesOnDisk() < rip.getVisibleLength()) {
|
if (rip.getBytesOnDisk() < rip.getVisibleLength()) {
|
||||||
|
@ -167,7 +167,7 @@ private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception {
|
|||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
//create a file
|
//create a file
|
||||||
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
|
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
String filestr = "/foo";
|
String filestr = "/foo";
|
||||||
Path filepath = new Path(filestr);
|
Path filepath = new Path(filestr);
|
||||||
DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
|
DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
|
||||||
@ -225,7 +225,7 @@ private static void assertEquals(ReplicaInfo originalInfo, ReplicaRecoveryInfo r
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Test
|
/** Test
|
||||||
* {@link FsDatasetImpl#initReplicaRecovery(String, ReplicaMap, Block, long)}
|
* {@link FsDatasetImpl#initReplicaRecovery(String, ReplicaMap, Block, long, long)}
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testInitReplicaRecovery() throws IOException {
|
public void testInitReplicaRecovery() throws IOException {
|
||||||
@ -246,8 +246,9 @@ public void testInitReplicaRecovery() throws IOException {
|
|||||||
final ReplicaInfo originalInfo = map.get(bpid, b);
|
final ReplicaInfo originalInfo = map.get(bpid, b);
|
||||||
|
|
||||||
final long recoveryid = gs + 1;
|
final long recoveryid = gs + 1;
|
||||||
final ReplicaRecoveryInfo recoveryInfo = FsDatasetImpl.initReplicaRecovery(
|
final ReplicaRecoveryInfo recoveryInfo = FsDatasetImpl
|
||||||
bpid, map, blocks[0], recoveryid);
|
.initReplicaRecovery(bpid, map, blocks[0], recoveryid,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
|
||||||
assertEquals(originalInfo, recoveryInfo);
|
assertEquals(originalInfo, recoveryInfo);
|
||||||
|
|
||||||
final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b);
|
final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b);
|
||||||
@ -256,7 +257,9 @@ public void testInitReplicaRecovery() throws IOException {
|
|||||||
|
|
||||||
//recover one more time
|
//recover one more time
|
||||||
final long recoveryid2 = gs + 2;
|
final long recoveryid2 = gs + 2;
|
||||||
final ReplicaRecoveryInfo recoveryInfo2 = FsDatasetImpl.initReplicaRecovery(bpid, map, blocks[0], recoveryid2);
|
final ReplicaRecoveryInfo recoveryInfo2 = FsDatasetImpl
|
||||||
|
.initReplicaRecovery(bpid, map, blocks[0], recoveryid2,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
|
||||||
assertEquals(originalInfo, recoveryInfo2);
|
assertEquals(originalInfo, recoveryInfo2);
|
||||||
|
|
||||||
final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b);
|
final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b);
|
||||||
@ -265,7 +268,8 @@ public void testInitReplicaRecovery() throws IOException {
|
|||||||
|
|
||||||
//case RecoveryInProgressException
|
//case RecoveryInProgressException
|
||||||
try {
|
try {
|
||||||
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
|
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
}
|
}
|
||||||
catch(RecoveryInProgressException ripe) {
|
catch(RecoveryInProgressException ripe) {
|
||||||
@ -276,7 +280,9 @@ public void testInitReplicaRecovery() throws IOException {
|
|||||||
{ // BlockRecoveryFI_01: replica not found
|
{ // BlockRecoveryFI_01: replica not found
|
||||||
final long recoveryid = gs + 1;
|
final long recoveryid = gs + 1;
|
||||||
final Block b = new Block(firstblockid - 1, length, gs);
|
final Block b = new Block(firstblockid - 1, length, gs);
|
||||||
ReplicaRecoveryInfo r = FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
|
ReplicaRecoveryInfo r = FsDatasetImpl.initReplicaRecovery(bpid, map, b,
|
||||||
|
recoveryid,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
|
||||||
Assert.assertNull("Data-node should not have this replica.", r);
|
Assert.assertNull("Data-node should not have this replica.", r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,7 +290,8 @@ public void testInitReplicaRecovery() throws IOException {
|
|||||||
final long recoveryid = gs - 1;
|
final long recoveryid = gs - 1;
|
||||||
final Block b = new Block(firstblockid + 1, length, gs);
|
final Block b = new Block(firstblockid + 1, length, gs);
|
||||||
try {
|
try {
|
||||||
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
|
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
}
|
}
|
||||||
catch(IOException ioe) {
|
catch(IOException ioe) {
|
||||||
@ -297,7 +304,8 @@ public void testInitReplicaRecovery() throws IOException {
|
|||||||
final long recoveryid = gs + 1;
|
final long recoveryid = gs + 1;
|
||||||
final Block b = new Block(firstblockid, length, gs+1);
|
final Block b = new Block(firstblockid, length, gs+1);
|
||||||
try {
|
try {
|
||||||
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
|
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
|
||||||
fail("InitReplicaRecovery should fail because replica's " +
|
fail("InitReplicaRecovery should fail because replica's " +
|
||||||
"gs is less than the block's gs");
|
"gs is less than the block's gs");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@ -321,7 +329,7 @@ public void testUpdateReplicaUnderRecovery() throws IOException {
|
|||||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
|
||||||
//create a file
|
//create a file
|
||||||
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
|
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
String filestr = "/foo";
|
String filestr = "/foo";
|
||||||
Path filepath = new Path(filestr);
|
Path filepath = new Path(filestr);
|
||||||
DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
|
DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
|
||||||
|
Loading…
Reference in New Issue
Block a user