HDFS-11124. Report blockIds of internal blocks for EC files in Fsck. Contributed by Takanobu Asanuma.
This commit is contained in:
parent
18e1d68209
commit
b782bf2156
@ -244,14 +244,22 @@ final boolean hasNoStorage() {
|
||||
return true;
|
||||
}
|
||||
|
||||
static class StorageAndBlockIndex {
|
||||
final DatanodeStorageInfo storage;
|
||||
final byte blockIndex;
|
||||
public static class StorageAndBlockIndex {
|
||||
private final DatanodeStorageInfo storage;
|
||||
private final byte blockIndex;
|
||||
|
||||
StorageAndBlockIndex(DatanodeStorageInfo storage, byte blockIndex) {
|
||||
this.storage = storage;
|
||||
this.blockIndex = blockIndex;
|
||||
}
|
||||
|
||||
public DatanodeStorageInfo getStorage() {
|
||||
return storage;
|
||||
}
|
||||
|
||||
public byte getBlockIndex() {
|
||||
return blockIndex;
|
||||
}
|
||||
}
|
||||
|
||||
public Iterable<StorageAndBlockIndex> getStorageAndIndexInfos() {
|
||||
|
@ -3903,10 +3903,10 @@ private void countReplicasForStripedBlock(NumberReplicas counters,
|
||||
BitSet bitSet = new BitSet(block.getTotalBlockNum());
|
||||
for (StorageAndBlockIndex si : block.getStorageAndIndexInfos()) {
|
||||
StoredReplicaState state = checkReplicaOnStorage(counters, block,
|
||||
si.storage, nodesCorrupt, inStartupSafeMode);
|
||||
si.getStorage(), nodesCorrupt, inStartupSafeMode);
|
||||
if (state == StoredReplicaState.LIVE) {
|
||||
if (!bitSet.get(si.blockIndex)) {
|
||||
bitSet.set(si.blockIndex);
|
||||
if (!bitSet.get(si.getBlockIndex())) {
|
||||
bitSet.set(si.getBlockIndex());
|
||||
} else {
|
||||
counters.subtract(StoredReplicaState.LIVE, 1);
|
||||
counters.add(StoredReplicaState.REDUNDANT, 1);
|
||||
|
@ -28,6 +28,7 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -67,6 +68,7 @@
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
|
||||
@ -591,9 +593,23 @@ private String getReplicaInfo(BlockInfo storedBlock) {
|
||||
blockManager.getStorages(storedBlock) :
|
||||
storedBlock.getUnderConstructionFeature().getExpectedStorageLocations();
|
||||
StringBuilder sb = new StringBuilder(" [");
|
||||
final boolean isStriped = storedBlock.isStriped();
|
||||
Map<DatanodeStorageInfo, Long> storage2Id = new HashMap<>();
|
||||
if (isStriped && isComplete) {
|
||||
long blockId = storedBlock.getBlockId();
|
||||
Iterable<StorageAndBlockIndex> sis =
|
||||
((BlockInfoStriped)storedBlock).getStorageAndIndexInfos();
|
||||
for (StorageAndBlockIndex si: sis){
|
||||
storage2Id.put(si.getStorage(), blockId + si.getBlockIndex());
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < storages.length; i++) {
|
||||
DatanodeStorageInfo storage = storages[i];
|
||||
if (isStriped && isComplete) {
|
||||
long index = storage2Id.get(storage);
|
||||
sb.append("blk_" + index + ":");
|
||||
}
|
||||
DatanodeDescriptor dnDesc = storage.getDatanodeDescriptor();
|
||||
if (showRacks) {
|
||||
sb.append(NodeBase.getPath(dnDesc));
|
||||
|
@ -677,16 +677,16 @@ public void testFsckOpenECFiles() throws Exception {
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
|
||||
ErasureCodingPolicy ecPolicy =
|
||||
ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||
int numAllUnits = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
|
||||
final int dataBlocks = ecPolicy.getNumDataUnits();
|
||||
final int cellSize = ecPolicy.getCellSize();
|
||||
final int numAllUnits = dataBlocks + ecPolicy.getNumParityUnits();
|
||||
int blockSize = 2 * cellSize;
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
|
||||
numAllUnits + 1).build();
|
||||
FileSystem fs = null;
|
||||
String topDir = "/myDir";
|
||||
byte[] randomBytes = new byte[3000000];
|
||||
int seed = 42;
|
||||
new Random(seed).nextBytes(randomBytes);
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
util.createFiles(fs, topDir);
|
||||
// set topDir to EC when it has replicated files
|
||||
cluster.getFileSystem().getClient().setErasureCodingPolicy(
|
||||
@ -697,11 +697,12 @@ public void testFsckOpenECFiles() throws Exception {
|
||||
// Open a EC file for writing and do not close for now
|
||||
Path openFile = new Path(topDir + "/openECFile");
|
||||
FSDataOutputStream out = fs.create(openFile);
|
||||
int writeCount = 0;
|
||||
while (writeCount != 300) {
|
||||
out.write(randomBytes);
|
||||
writeCount++;
|
||||
}
|
||||
int blockGroupSize = dataBlocks * blockSize;
|
||||
// data size is more than 1 block group and less than 2 block groups
|
||||
byte[] randomBytes = new byte[2 * blockGroupSize - cellSize];
|
||||
int seed = 42;
|
||||
new Random(seed).nextBytes(randomBytes);
|
||||
out.write(randomBytes);
|
||||
|
||||
// make sure the fsck can correctly handle mixed ec/replicated files
|
||||
runFsck(conf, 0, true, topDir, "-files", "-blocks", "-openforwrite");
|
||||
@ -723,6 +724,27 @@ public void testFsckOpenECFiles() throws Exception {
|
||||
assertTrue(outStr.contains("Expected_repl=" + numAllUnits));
|
||||
assertTrue(outStr.contains("Under Construction Block:"));
|
||||
|
||||
// check reported blockIDs of internal blocks
|
||||
LocatedStripedBlock lsb = (LocatedStripedBlock)fs.getClient()
|
||||
.getLocatedBlocks(openFile.toString(), 0, cellSize * dataBlocks).get(0);
|
||||
long groupId = lsb.getBlock().getBlockId();
|
||||
byte[] indices = lsb.getBlockIndices();
|
||||
DatanodeInfo[] locs = lsb.getLocations();
|
||||
long blockId;
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
blockId = groupId + indices[i];
|
||||
String str = "blk_" + blockId + ":" + locs[i];
|
||||
assertTrue(outStr.contains(str));
|
||||
}
|
||||
|
||||
// check the output of under-constructed blocks doesn't include the blockIDs
|
||||
String regex = ".*Expected_repl=" + numAllUnits + "(.*)\nStatus:.*";
|
||||
Pattern p = Pattern.compile(regex, Pattern.DOTALL);
|
||||
Matcher m = p.matcher(outStr);
|
||||
assertTrue(m.find());
|
||||
String ucBlockOutput = m.group(1);
|
||||
assertFalse(ucBlockOutput.contains("blk_"));
|
||||
|
||||
// Close the file
|
||||
out.close();
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user