HDFS-5484. StorageType and State in DatanodeStorageInfo in NameNode is not accurate. (Contributed by Eric Sirianni)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1547462 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-12-03 16:30:29 +00:00
parent 18159be495
commit a1aa1836fb
11 changed files with 41 additions and 34 deletions

View File

@ -120,3 +120,6 @@ IMPROVEMENTS:
HDFS-5559. Fix TestDatanodeConfig in HDFS-2832. (Contributed by szetszwo) HDFS-5559. Fix TestDatanodeConfig in HDFS-2832. (Contributed by szetszwo)
HDFS-5484. StorageType and State in DatanodeStorageInfo in NameNode is
not accurate. (Eric Sirianni via Arpit Agarwal)

View File

@ -454,7 +454,7 @@ class BPServiceActor implements Runnable {
long brCreateStartTime = now(); long brCreateStartTime = now();
long totalBlockCount = 0; long totalBlockCount = 0;
Map<String, BlockListAsLongs> perVolumeBlockLists = Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
dn.getFSDataset().getBlockReports(bpos.getBlockPoolId()); dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
// Send block report // Send block report
@ -463,13 +463,11 @@ class BPServiceActor implements Runnable {
new StorageBlockReport[perVolumeBlockLists.size()]; new StorageBlockReport[perVolumeBlockLists.size()];
int i = 0; int i = 0;
for(Map.Entry<String, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) { for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
String storageID = kvPair.getKey(); DatanodeStorage dnStorage = kvPair.getKey();
BlockListAsLongs blockList = kvPair.getValue(); BlockListAsLongs blockList = kvPair.getValue();
totalBlockCount += blockList.getNumberOfBlocks(); totalBlockCount += blockList.getNumberOfBlocks();
// Dummy DatanodeStorage object just for sending the block report.
DatanodeStorage dnStorage = new DatanodeStorage(storageID);
reports[i++] = reports[i++] =
new StorageBlockReport( new StorageBlockReport(
dnStorage, blockList.getBlockListAsLongs()); dnStorage, blockList.getBlockListAsLongs());

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@ -268,9 +269,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
/** /**
* Returns one block report per volume. * Returns one block report per volume.
* @param bpid Block Pool Id * @param bpid Block Pool Id
* @return - a map of StorageID to block report for the volume. * @return - a map of DatanodeStorage to block report for the volume.
*/ */
public Map<String, BlockListAsLongs> getBlockReports(String bpid); public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid);
/** /**
* Returns the cache report - the full list of cached block IDs of a * Returns the cache report - the full list of cached block IDs of a

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosing
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
@ -1089,14 +1090,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override @Override
public Map<String, BlockListAsLongs> getBlockReports(String bpid) { public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
Map<String, BlockListAsLongs> blockReportMap = Map<DatanodeStorage, BlockListAsLongs> blockReportMap =
new HashMap<String, BlockListAsLongs>(); new HashMap<DatanodeStorage, BlockListAsLongs>();
for (FsVolumeImpl v : getVolumes()) { for (FsVolumeImpl v : getVolumes()) {
ReplicaMap rMap = perVolumeReplicaMap.get(v.getStorageID()); ReplicaMap rMap = perVolumeReplicaMap.get(v.getStorageID());
BlockListAsLongs blockList = getBlockReportWithReplicaMap(bpid, rMap); BlockListAsLongs blockList = getBlockReportWithReplicaMap(bpid, rMap);
blockReportMap.put(v.getStorageID(), blockList); blockReportMap.put(v.toDatanodeStorage(), blockList);
} }
return blockReportMap; return blockReportMap;

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -332,5 +333,10 @@ class FsVolumeImpl implements FsVolumeSpi {
public StorageType getStorageType() { public StorageType getStorageType() {
return storageType; return storageType;
} }
DatanodeStorage toDatanodeStorage() {
return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
}
} }

View File

@ -88,6 +88,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.hdfs.web.HftpFileSystem; import org.apache.hadoop.hdfs.web.HftpFileSystem;
@ -1970,7 +1971,7 @@ public class MiniDFSCluster {
* @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes() * @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes()
* @return the block report for the specified data node * @return the block report for the specified data node
*/ */
public Map<String, BlockListAsLongs> getBlockReport(String bpid, int dataNodeIndex) { public Map<DatanodeStorage, BlockListAsLongs> getBlockReport(String bpid, int dataNodeIndex) {
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
throw new IndexOutOfBoundsException(); throw new IndexOutOfBoundsException();
} }
@ -1984,10 +1985,10 @@ public class MiniDFSCluster {
* @return block reports from all data nodes * @return block reports from all data nodes
* BlockListAsLongs is indexed in the same order as the list of datanodes returned by getDataNodes() * BlockListAsLongs is indexed in the same order as the list of datanodes returned by getDataNodes()
*/ */
public List<Map<String, BlockListAsLongs>> getAllBlockReports(String bpid) { public List<Map<DatanodeStorage, BlockListAsLongs>> getAllBlockReports(String bpid) {
int numDataNodes = dataNodes.size(); int numDataNodes = dataNodes.size();
final List<Map<String, BlockListAsLongs>> result final List<Map<DatanodeStorage, BlockListAsLongs>> result
= new ArrayList<Map<String, BlockListAsLongs>>(numDataNodes); = new ArrayList<Map<DatanodeStorage, BlockListAsLongs>>(numDataNodes);
for (int i = 0; i < numDataNodes; ++i) { for (int i = 0; i < numDataNodes; ++i) {
result.add(getBlockReport(bpid, i)); result.add(getBlockReport(bpid, i));
} }

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
@ -1394,11 +1395,11 @@ public class TestDFSShell {
List<File> files = new ArrayList<File>(); List<File> files = new ArrayList<File>();
List<DataNode> datanodes = cluster.getDataNodes(); List<DataNode> datanodes = cluster.getDataNodes();
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
List<Map<String, BlockListAsLongs>> blocks = cluster.getAllBlockReports(poolId); List<Map<DatanodeStorage, BlockListAsLongs>> blocks = cluster.getAllBlockReports(poolId);
for(int i = 0; i < blocks.size(); i++) { for(int i = 0; i < blocks.size(); i++) {
DataNode dn = datanodes.get(i); DataNode dn = datanodes.get(i);
Map<String, BlockListAsLongs> map = blocks.get(i); Map<DatanodeStorage, BlockListAsLongs> map = blocks.get(i);
for(Map.Entry<String, BlockListAsLongs> e : map.entrySet()) { for(Map.Entry<DatanodeStorage, BlockListAsLongs> e : map.entrySet()) {
for(Block b : e.getValue()) { for(Block b : e.getValue()) {
files.add(DataNodeTestUtils.getFile(dn, poolId, b.getBlockId())); files.add(DataNodeTestUtils.getFile(dn, poolId, b.getBlockId()));
} }

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.Test; import org.junit.Test;
@ -136,7 +137,7 @@ public class TestInjectionForSimulatedStorage {
DFSTestUtil.createFile(cluster.getFileSystem(), testPath, filesize, DFSTestUtil.createFile(cluster.getFileSystem(), testPath, filesize,
filesize, blockSize, (short) numDataNodes, 0L); filesize, blockSize, (short) numDataNodes, 0L);
waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, 20); waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, 20);
List<Map<String, BlockListAsLongs>> blocksList = cluster.getAllBlockReports(bpid); List<Map<DatanodeStorage, BlockListAsLongs>> blocksList = cluster.getAllBlockReports(bpid);
cluster.shutdown(); cluster.shutdown();
cluster = null; cluster = null;
@ -157,7 +158,7 @@ public class TestInjectionForSimulatedStorage {
.build(); .build();
cluster.waitActive(); cluster.waitActive();
Set<Block> uniqueBlocks = new HashSet<Block>(); Set<Block> uniqueBlocks = new HashSet<Block>();
for(Map<String, BlockListAsLongs> map : blocksList) { for(Map<DatanodeStorage, BlockListAsLongs> map : blocksList) {
for(BlockListAsLongs blockList : map.values()) { for(BlockListAsLongs blockList : map.values()) {
for(Block b : blockList) { for(Block b : blockList) {
uniqueBlocks.add(new Block(b)); uniqueBlocks.add(new Block(b));

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -484,12 +485,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
@Override @Override
public synchronized Map<String, BlockListAsLongs> getBlockReports( public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports(
String bpid) { String bpid) {
Map<String, BlockListAsLongs> reports = return Collections.singletonMap(new DatanodeStorage(storage.storageUuid), getBlockReport(bpid));
new HashMap<String, BlockListAsLongs>();
reports.put(storage.storageUuid, getBlockReport(bpid));
return reports;
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi

View File

@ -120,7 +120,7 @@ public class TestBlockReport {
private static StorageBlockReport[] getBlockReports( private static StorageBlockReport[] getBlockReports(
DataNode dn, String bpid, boolean corruptOneBlockGs, DataNode dn, String bpid, boolean corruptOneBlockGs,
boolean corruptOneBlockLen) { boolean corruptOneBlockLen) {
Map<String, BlockListAsLongs> perVolumeBlockLists = Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
dn.getFSDataset().getBlockReports(bpid); dn.getFSDataset().getBlockReports(bpid);
// Send block report // Send block report
@ -130,8 +130,8 @@ public class TestBlockReport {
boolean corruptedLen = false; boolean corruptedLen = false;
int reportIndex = 0; int reportIndex = 0;
for(Map.Entry<String, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) { for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
String storageID = kvPair.getKey(); DatanodeStorage dnStorage = kvPair.getKey();
BlockListAsLongs blockList = kvPair.getValue(); BlockListAsLongs blockList = kvPair.getValue();
// Walk the list of blocks until we find one each to corrupt the // Walk the list of blocks until we find one each to corrupt the
@ -150,8 +150,6 @@ public class TestBlockReport {
} }
} }
// Dummy DatanodeStorage object just for sending the block report.
DatanodeStorage dnStorage = new DatanodeStorage(storageID);
reports[reportIndex++] = reports[reportIndex++] =
new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs()); new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
} }

View File

@ -154,7 +154,7 @@ public class TestDataNodeVolumeFailure {
String bpid = cluster.getNamesystem().getBlockPoolId(); String bpid = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid); DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
Map<String, BlockListAsLongs> perVolumeBlockLists = Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
dn.getFSDataset().getBlockReports(bpid); dn.getFSDataset().getBlockReports(bpid);
// Send block report // Send block report
@ -162,10 +162,9 @@ public class TestDataNodeVolumeFailure {
new StorageBlockReport[perVolumeBlockLists.size()]; new StorageBlockReport[perVolumeBlockLists.size()];
int reportIndex = 0; int reportIndex = 0;
for(Map.Entry<String, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) { for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
String storageID = kvPair.getKey(); DatanodeStorage dnStorage = kvPair.getKey();
BlockListAsLongs blockList = kvPair.getValue(); BlockListAsLongs blockList = kvPair.getValue();
DatanodeStorage dnStorage = new DatanodeStorage(storageID);
reports[reportIndex++] = reports[reportIndex++] =
new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs()); new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
} }