HDFS-10343. BlockManager#createLocatedBlocks may return blocks on failed storages. Contributed by Kuhu Shukla.

This commit is contained in:
Kihwal Lee 2016-08-04 14:55:21 -05:00
parent 331ef25152
commit 57369a678c
2 changed files with 87 additions and 2 deletions

View File

@ -22,6 +22,7 @@
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet; import java.util.BitSet;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -1045,14 +1046,15 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos)
final boolean isCorrupt = numCorruptReplicas != 0 && final boolean isCorrupt = numCorruptReplicas != 0 &&
numCorruptReplicas == numNodes; numCorruptReplicas == numNodes;
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas; final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines]; DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null; final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null;
int j = 0, i = 0; int j = 0, i = 0;
if (numMachines > 0) { if (numMachines > 0) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
final DatanodeDescriptor d = storage.getDatanodeDescriptor(); final DatanodeDescriptor d = storage.getDatanodeDescriptor();
final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d); final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
if (isCorrupt || (!replicaCorrupt)) { if ((isCorrupt || (!replicaCorrupt)) &&
storage.getState() != State.FAILED) {
machines[j++] = storage; machines[j++] = storage;
// TODO this can be more efficient // TODO this can be more efficient
if (blockIndices != null) { if (blockIndices != null) {
@ -1063,6 +1065,11 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos)
} }
} }
} }
if(j < machines.length) {
machines = Arrays.copyOf(machines, j);
}
assert j == machines.length : assert j == machines.length :
"isCorrupt: " + isCorrupt + "isCorrupt: " + isCorrupt +
" numMachines: " + numMachines + " numMachines: " + numMachines +

View File

@ -72,8 +72,10 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
@ -88,6 +90,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
@ -1152,6 +1155,81 @@ public void run() {
} }
} }
@Test
public void testBlockManagerMachinesArray() throws Exception {
final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
cluster.waitActive();
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
FileSystem fs = cluster.getFileSystem();
final Path filePath = new Path("/tmp.txt");
final long fileLen = 1L;
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, 1L);
ArrayList<DataNode> datanodes = cluster.getDataNodes();
assertEquals(datanodes.size(), 4);
FSNamesystem ns = cluster.getNamesystem();
// get the block
final String bpid = cluster.getNamesystem().getBlockPoolId();
File storageDir = cluster.getInstanceStorageDir(0, 0);
File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
assertTrue("Data directory does not exist", dataDir.exists());
BlockInfo blockInfo = blockManager.blocksMap.getBlocks().iterator().next();
ExtendedBlock blk = new ExtendedBlock(bpid, blockInfo.getBlockId(),
blockInfo.getNumBytes(), blockInfo.getGenerationStamp());
DatanodeDescriptor failedStorageDataNode =
blockManager.getStoredBlock(blockInfo).getDatanode(0);
DatanodeDescriptor corruptStorageDataNode =
blockManager.getStoredBlock(blockInfo).getDatanode(1);
ArrayList<StorageReport> reports = new ArrayList<StorageReport>();
for(int i=0; i<failedStorageDataNode.getStorageInfos().length; i++) {
DatanodeStorageInfo storageInfo = failedStorageDataNode
.getStorageInfos()[i];
DatanodeStorage dns = new DatanodeStorage(
failedStorageDataNode.getStorageInfos()[i].getStorageID(),
DatanodeStorage.State.FAILED,
failedStorageDataNode.getStorageInfos()[i].getStorageType());
while(storageInfo.getBlockIterator().hasNext()) {
BlockInfo blockInfo1 = storageInfo.getBlockIterator().next();
if(blockInfo1.equals(blockInfo)) {
StorageReport report = new StorageReport(
dns, true, storageInfo.getCapacity(),
storageInfo.getDfsUsed(), storageInfo.getRemaining(),
storageInfo.getBlockPoolUsed());
reports.add(report);
break;
}
}
}
failedStorageDataNode.updateHeartbeat(reports.toArray(StorageReport
.EMPTY_ARRAY), 0L, 0L, 0, 0, null);
ns.writeLock();
DatanodeStorageInfo corruptStorageInfo= null;
for(int i=0; i<corruptStorageDataNode.getStorageInfos().length; i++) {
corruptStorageInfo = corruptStorageDataNode.getStorageInfos()[i];
while(corruptStorageInfo.getBlockIterator().hasNext()) {
BlockInfo blockInfo1 = corruptStorageInfo.getBlockIterator().next();
if (blockInfo1.equals(blockInfo)) {
break;
}
}
}
blockManager.findAndMarkBlockAsCorrupt(blk, corruptStorageDataNode,
corruptStorageInfo.getStorageID(),
CorruptReplicasMap.Reason.ANY.toString());
ns.writeUnlock();
BlockInfo[] blockInfos = new BlockInfo[] {blockInfo};
ns.readLock();
LocatedBlocks locatedBlocks =
blockManager.createLocatedBlocks(blockInfos, 3L, false, 0L, 3L,
false, false, null, null);
assertTrue("Located Blocks should exclude corrupt" +
"replicas and failed storages",
locatedBlocks.getLocatedBlocks().size() == 1);
ns.readUnlock();
}
@Test @Test
public void testMetaSaveCorruptBlocks() throws Exception { public void testMetaSaveCorruptBlocks() throws Exception {
List<DatanodeStorageInfo> origStorages = getStorages(0, 1); List<DatanodeStorageInfo> origStorages = getStorages(0, 1);