HDFS-8863. The remaining space check in BlockPlacementPolicyDefault is flawed. (Kihwal Lee via yliu)

This commit is contained in:
yliu 2015-08-20 20:15:03 +08:00
parent 80a29906bc
commit 5e8fe89437
4 changed files with 84 additions and 29 deletions

View File

@ -1196,11 +1196,11 @@ Release 2.7.2 - UNRELEASED
IMPROVEMENTS IMPROVEMENTS
HDFS-8659. Block scanner INFO message is spamming logs. (Yongjun Zhang) HDFS-8659. Block scanner INFO message is spamming logs. (Yongjun Zhang)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8722. Optimize datanode writes for small writes and flushes (kihwal) HDFS-8722. Optimize datanode writes for small writes and flushes (kihwal)
BUG FIXES BUG FIXES
@ -1215,6 +1215,9 @@ Release 2.7.2 - UNRELEASED
HDFS-8867. Enable optimized block reports. (Daryn Sharp via jing9) HDFS-8867. Enable optimized block reports. (Daryn Sharp via jing9)
HDFS-8863. The remaining space check in BlockPlacementPolicyDefault is
flawed. (Kihwal Lee via yliu)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -868,7 +868,8 @@ private boolean isGoodTarget(DatanodeStorageInfo storage,
final long requiredSize = blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE; final long requiredSize = blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType()); final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType());
final long remaining = node.getRemaining(storage.getStorageType()); final long remaining = node.getRemaining(storage.getStorageType(),
requiredSize);
if (requiredSize > remaining - scheduledSize) { if (requiredSize > remaining - scheduledSize) {
logNodeIsNotChosen(storage, "the node does not have enough " logNodeIsNotChosen(storage, "the node does not have enough "
+ storage.getStorageType() + " space" + storage.getStorageType() + " space"

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.EnumCounters;
@ -662,16 +663,26 @@ public Block[] getInvalidateBlocks(int maxblocks) {
} }
/** /**
* @return Approximate number of blocks currently scheduled to be written * Return the sum of remaining spaces of the specified type. If the remaining
* space of a storage is less than minSize, it won't be counted toward the
* sum.
*
* @param t The storage type. If null, the type is ignored.
* @param minSize The minimum free space required.
* @return the sum of remaining spaces that are bigger than minSize.
*/ */
public long getRemaining(StorageType t) { public long getRemaining(StorageType t, long minSize) {
long remaining = 0; long remaining = 0;
for(DatanodeStorageInfo s : getStorageInfos()) { for (DatanodeStorageInfo s : getStorageInfos()) {
if (s.getStorageType() == t) { if (s.getState() == State.NORMAL &&
remaining += s.getRemaining(); (t == null || s.getStorageType() == t)) {
long r = s.getRemaining();
if (r >= minSize) {
remaining += r;
}
} }
} }
return remaining; return remaining;
} }
/** /**

View File

@ -100,6 +100,16 @@ private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null); dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
} }
private static void updateHeartbeatForExtraStorage(long capacity,
long dfsUsed, long remaining, long blockPoolUsed) {
DatanodeDescriptor dn = dataNodes[5];
dn.getStorageInfos()[1].setUtilizationForTesting(
capacity, dfsUsed, remaining, blockPoolUsed);
dn.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
0L, 0L, 0, 0, null);
}
@BeforeClass @BeforeClass
public static void setupCluster() throws Exception { public static void setupCluster() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
@ -113,6 +123,16 @@ public static void setupCluster() throws Exception {
storages = DFSTestUtil.createDatanodeStorageInfos(racks); storages = DFSTestUtil.createDatanodeStorageInfos(racks);
dataNodes = DFSTestUtil.toDatanodeDescriptor(storages); dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
// create an extra storage for dn5.
DatanodeStorage extraStorage = new DatanodeStorage(
storages[5].getStorageID() + "-extra", DatanodeStorage.State.NORMAL,
StorageType.DEFAULT);
/* DatanodeStorageInfo si = new DatanodeStorageInfo(
storages[5].getDatanodeDescriptor(), extraStorage);
*/
BlockManagerTestUtil.updateStorage(storages[5].getDatanodeDescriptor(),
extraStorage);
FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class); File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
@ -135,11 +155,17 @@ public static void setupCluster() throws Exception {
bm.getDatanodeManager().getHeartbeatManager().addDatanode( bm.getDatanodeManager().getHeartbeatManager().addDatanode(
dataNodes[i]); dataNodes[i]);
} }
resetHeartbeatForStorages();
}
private static void resetHeartbeatForStorages() {
for (int i=0; i < NUM_OF_DATANODES; i++) { for (int i=0; i < NUM_OF_DATANODES; i++) {
updateHeartbeatWithUsage(dataNodes[i], updateHeartbeatWithUsage(dataNodes[i],
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
} }
// No available space in the extra storage of dn0
updateHeartbeatForExtraStorage(0L, 0L, 0L, 0L);
} }
private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) { private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
@ -149,6 +175,31 @@ private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInf
private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) { private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) {
return cluster.isOnSameRack(left.getDatanodeDescriptor(), right); return cluster.isOnSameRack(left.getDatanodeDescriptor(), right);
} }
/**
* Test whether the remaining space per storage is individually
* considered.
*/
@Test
public void testChooseNodeWithMultipleStorages() throws Exception {
updateHeartbeatWithUsage(dataNodes[5],
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L,
0L, 0L, 0, 0);
updateHeartbeatForExtraStorage(
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L);
DatanodeStorageInfo[] targets;
targets = chooseTarget (1, dataNodes[5],
new ArrayList<DatanodeStorageInfo>(), null);
assertEquals(1, targets.length);
assertEquals(storages[4], targets[0]);
resetHeartbeatForStorages();
}
/** /**
* In this testcase, client is dataNodes[0]. So the 1st replica should be * In this testcase, client is dataNodes[0]. So the 1st replica should be
* placed on dataNodes[0], the 2nd replica should be placed on * placed on dataNodes[0], the 2nd replica should be placed on
@ -190,10 +241,8 @@ public void testChooseTarget1() throws Exception {
assertTrue(isOnSameRack(targets[1], targets[2]) || assertTrue(isOnSameRack(targets[1], targets[2]) ||
isOnSameRack(targets[2], targets[3])); isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[0], targets[2])); assertFalse(isOnSameRack(targets[0], targets[2]));
updateHeartbeatWithUsage(dataNodes[0], resetHeartbeatForStorages();
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
} }
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) { private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
@ -348,9 +397,7 @@ public void testChooseTarget3() throws Exception {
isOnSameRack(targets[2], targets[3])); isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[1], targets[3])); assertFalse(isOnSameRack(targets[1], targets[3]));
updateHeartbeatWithUsage(dataNodes[0], resetHeartbeatForStorages();
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
} }
/** /**
@ -391,12 +438,8 @@ public void testChoooseTarget4() throws Exception {
assertTrue(isOnSameRack(targets[0], targets[1]) || assertTrue(isOnSameRack(targets[0], targets[1]) ||
isOnSameRack(targets[1], targets[2])); isOnSameRack(targets[1], targets[2]));
assertFalse(isOnSameRack(targets[0], targets[2])); assertFalse(isOnSameRack(targets[0], targets[2]));
for(int i=0; i<2; i++) { resetHeartbeatForStorages();
updateHeartbeatWithUsage(dataNodes[i],
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
} }
/** /**
@ -474,6 +517,7 @@ public void testChooseTarget6() throws Exception {
} finally { } finally {
bm.getDatanodeManager().getNetworkTopology().remove(newDn); bm.getDatanodeManager().getNetworkTopology().remove(newDn);
} }
resetHeartbeatForStorages();
} }
@ -527,12 +571,8 @@ public void testChooseTargetWithMoreThanAvailableNodes() throws Exception {
// Suppose to place replicas on each node but two data nodes are not // Suppose to place replicas on each node but two data nodes are not
// available for placing replica, so here we expect a short of 2 // available for placing replica, so here we expect a short of 2
assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2")); assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2"));
for(int i=0; i<2; i++) { resetHeartbeatForStorages();
updateHeartbeatWithUsage(dataNodes[i],
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
} }
private boolean containsWithinRange(DatanodeStorageInfo target, private boolean containsWithinRange(DatanodeStorageInfo target,