HDFS-12886. Ignore minReplication for block recovery. Contributed by Lukas Majercak.
This commit is contained in:
parent
154cfb2b62
commit
08ff1586d5
@ -991,6 +991,13 @@ public boolean commitOrCompleteLastBlock(BlockCollection bc,
|
|||||||
addExpectedReplicasToPending(lastBlock);
|
addExpectedReplicasToPending(lastBlock);
|
||||||
}
|
}
|
||||||
completeBlock(lastBlock, iip, false);
|
completeBlock(lastBlock, iip, false);
|
||||||
|
} else if (pendingRecoveryBlocks.isUnderRecovery(lastBlock)) {
|
||||||
|
// We've just finished recovery for this block, complete
|
||||||
|
// the block forcibly disregarding number of replicas.
|
||||||
|
// This is to ignore minReplication, the block will be closed
|
||||||
|
// and then replicated out.
|
||||||
|
completeBlock(lastBlock, iip, true);
|
||||||
|
updateNeededReconstructions(lastBlock, 1, 0);
|
||||||
}
|
}
|
||||||
return committed;
|
return committed;
|
||||||
}
|
}
|
||||||
|
@ -19,9 +19,14 @@
|
|||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.AppendTestUtil;
|
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
@ -42,6 +47,7 @@
|
|||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -1142,4 +1148,81 @@ public Boolean get() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that block will be recovered even if there are less than the
|
||||||
|
* specified minReplication datanodes involved in its recovery.
|
||||||
|
*
|
||||||
|
* Check that, after recovering, the block will be successfully replicated.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000L)
|
||||||
|
public void testRecoveryWillIgnoreMinReplication() throws Exception {
|
||||||
|
tearDown(); // Stop the Mocked DN started in startup()
|
||||||
|
|
||||||
|
final int blockSize = 4096;
|
||||||
|
final int numReplicas = 3;
|
||||||
|
final String filename = "/testIgnoreMinReplication";
|
||||||
|
final Path filePath = new Path(filename);
|
||||||
|
Configuration configuration = new HdfsConfiguration();
|
||||||
|
configuration.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
|
||||||
|
configuration.setInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
|
||||||
|
configuration.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(5)
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
final FSNamesystem fsn = cluster.getNamesystem();
|
||||||
|
|
||||||
|
// Create a file and never close the output stream to trigger recovery
|
||||||
|
FSDataOutputStream out = dfs.create(filePath, (short) numReplicas);
|
||||||
|
out.write(AppendTestUtil.randomBytes(0, blockSize));
|
||||||
|
out.hsync();
|
||||||
|
|
||||||
|
DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
|
||||||
|
cluster.getNameNodePort()), configuration);
|
||||||
|
LocatedBlock blk = dfsClient.getNamenode().
|
||||||
|
getBlockLocations(filename, 0, blockSize).
|
||||||
|
getLastLocatedBlock();
|
||||||
|
|
||||||
|
// Kill 2 out of 3 datanodes so that only 1 alive, thus < minReplication
|
||||||
|
List<DatanodeInfo> dataNodes = Arrays.asList(blk.getLocations());
|
||||||
|
assertEquals(dataNodes.size(), numReplicas);
|
||||||
|
for (DatanodeInfo dataNode : dataNodes.subList(0, numReplicas - 1)) {
|
||||||
|
cluster.stopDataNode(dataNode.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return fsn.getNumDeadDataNodes() == 2;
|
||||||
|
}
|
||||||
|
}, 300, 300000);
|
||||||
|
|
||||||
|
// Make sure hard lease expires to trigger replica recovery
|
||||||
|
cluster.setLeasePeriod(100L, 100L);
|
||||||
|
|
||||||
|
// Wait for recovery to succeed
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
try {
|
||||||
|
return dfs.isFileClosed(filePath);
|
||||||
|
} catch (IOException e) {}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 300, 300000);
|
||||||
|
|
||||||
|
// Wait for the block to be replicated
|
||||||
|
DFSTestUtil.waitForReplication(cluster, DFSTestUtil.getFirstBlock(
|
||||||
|
dfs, filePath), 1, numReplicas, 0);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user