HDFS-10810. setreplication removing block from underconstrcution temporarily. Contributed by Brahma Reddy Battula
This commit is contained in:
parent
853d65a157
commit
8078a5efd0
@ -4013,13 +4013,15 @@ private void updateNeededReconstructions(final BlockInfo block,
|
||||
return;
|
||||
}
|
||||
NumberReplicas repl = countNodes(block);
|
||||
int pendingNum = pendingReconstruction.getNumReplicas(block);
|
||||
int curExpectedReplicas = getRedundancy(block);
|
||||
if (isNeededReconstruction(block, repl.liveReplicas())) {
|
||||
neededReconstruction.update(block, repl.liveReplicas(),
|
||||
if (!hasEnoughEffectiveReplicas(block, repl, pendingNum,
|
||||
curExpectedReplicas)) {
|
||||
neededReconstruction.update(block, repl.liveReplicas() + pendingNum,
|
||||
repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(),
|
||||
curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
|
||||
} else {
|
||||
int oldReplicas = repl.liveReplicas()-curReplicasDelta;
|
||||
int oldReplicas = repl.liveReplicas() + pendingNum - curReplicasDelta;
|
||||
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
|
||||
neededReconstruction.remove(block, oldReplicas, repl.readOnlyReplicas(),
|
||||
repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
@ -38,6 +39,7 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.ChecksumException;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
||||
@ -230,6 +232,65 @@ public void testCorruptionWithDiskFailure() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetReplicationWhenBatchIBR() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,
|
||||
30000);
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY,
|
||||
1);
|
||||
DistributedFileSystem dfs;
|
||||
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(3).build()) {
|
||||
final int bufferSize = 1024; // 1024 Bytes each time
|
||||
byte[] outBuffer = new byte[bufferSize];
|
||||
dfs = cluster.getFileSystem();
|
||||
String fileName = "/testSetRep1";
|
||||
Path filePath = new Path(fileName);
|
||||
FSDataOutputStream out = dfs.create(filePath);
|
||||
out.write(outBuffer, 0, bufferSize);
|
||||
out.close();
|
||||
//sending the FBR to Delay next IBR
|
||||
cluster.triggerBlockReports();
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
try {
|
||||
cluster.triggerBlockReports();
|
||||
if (cluster.getNamesystem().getBlocksTotal() == 1) {
|
||||
return true;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// Ignore the exception
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}, 10, 3000);
|
||||
fileName = "/testSetRep2";
|
||||
filePath = new Path(fileName);
|
||||
out = dfs.create(filePath);
|
||||
out.write(outBuffer, 0, bufferSize);
|
||||
out.close();
|
||||
dfs.setReplication(filePath, (short) 10);
|
||||
// underreplicated Blocks should be one after setrep
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override public Boolean get() {
|
||||
try {
|
||||
return cluster.getNamesystem().getBlockManager()
|
||||
.getUnderReplicatedBlocksCount() == 1;
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}, 10, 3000);
|
||||
assertEquals(0,
|
||||
cluster.getNamesystem().getBlockManager().getMissingBlocksCount());
|
||||
}
|
||||
}
|
||||
|
||||
private void markAllBlocksAsCorrupt(BlockManager bm,
|
||||
ExtendedBlock blk) throws IOException {
|
||||
for (DatanodeStorageInfo info : bm.getStorages(blk.getLocalBlock())) {
|
||||
|
Loading…
Reference in New Issue
Block a user