HDFS-4885. Improve the verifyBlockPlacement() API in BlockPlacementPolicy. Contributed by Junping Du

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1534417 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-10-21 23:48:08 +00:00
parent 62f95b003b
commit 12ed24af66
7 changed files with 182 additions and 21 deletions

View File

@ -36,3 +36,6 @@ IMPROVEMENTS:
HDFS-4988. Datanode must support all the volumes as individual storages. HDFS-4988. Datanode must support all the volumes as individual storages.
(Arpit Agarwal) (Arpit Agarwal)
HDFS-4885. Improve the verifyBlockPlacement() API in BlockPlacementPolicy.
(Junping Du via szetszwo)

View File

@ -100,18 +100,17 @@ DatanodeStorageInfo[] chooseTarget(String src,
} }
/** /**
* Verify that the block is replicated on at least minRacks different racks * Verify if the block's placement meets requirement of placement policy,
* if there is more than minRacks rack in the system. * i.e. replicas are placed on no less than minRacks racks in the system.
* *
* @param srcPath the full pathname of the file to be verified * @param srcPath the full pathname of the file to be verified
* @param lBlk block with locations * @param lBlk block with locations
* @param minRacks number of racks the block should be replicated to * @param numOfReplicas replica number of file to be verified
* @return the difference between the required and the actual number of racks * @return the result of verification
* the block is replicated to.
*/ */
abstract public int verifyBlockPlacement(String srcPath, abstract public BlockPlacementStatus verifyBlockPlacement(String srcPath,
LocatedBlock lBlk, LocatedBlock lBlk,
int minRacks); int numOfReplicas);
/** /**
* Decide whether deleting the specified replica of the block still makes * Decide whether deleting the specified replica of the block still makes
* the block conform to the configured block placement policy. * the block conform to the configured block placement policy.

View File

@ -698,22 +698,22 @@ private DatanodeStorageInfo[] getPipeline(Node writer,
} }
@Override @Override
public int verifyBlockPlacement(String srcPath, public BlockPlacementStatus verifyBlockPlacement(String srcPath,
LocatedBlock lBlk, LocatedBlock lBlk, int numberOfReplicas) {
int minRacks) {
DatanodeInfo[] locs = lBlk.getLocations(); DatanodeInfo[] locs = lBlk.getLocations();
if (locs == null) if (locs == null)
locs = DatanodeDescriptor.EMPTY_ARRAY; locs = DatanodeDescriptor.EMPTY_ARRAY;
int numRacks = clusterMap.getNumOfRacks(); int numRacks = clusterMap.getNumOfRacks();
if(numRacks <= 1) // only one rack if(numRacks <= 1) // only one rack
return 0; return new BlockPlacementStatusDefault(
minRacks = Math.min(minRacks, numRacks); Math.min(numRacks, numberOfReplicas), numRacks);
int minRacks = Math.min(2, numberOfReplicas);
// 1. Check that all locations are different. // 1. Check that all locations are different.
// 2. Count locations on different racks. // 2. Count locations on different racks.
Set<String> racks = new TreeSet<String>(); Set<String> racks = new TreeSet<String>();
for (DatanodeInfo dn : locs) for (DatanodeInfo dn : locs)
racks.add(dn.getNetworkLocation()); racks.add(dn.getNetworkLocation());
return minRacks - racks.size(); return new BlockPlacementStatusDefault(racks.size(), minRacks);
} }
@Override @Override

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface BlockPlacementStatus {
/**
* Boolean value to identify if replicas of this block satisfy requirement of
* placement policy
* @return if replicas satisfy placement policy's requirement
*/
public boolean isPlacementPolicySatisfied();
/**
* Get description info for log or printed in case replicas are failed to meet
* requirement of placement policy
* @return description in case replicas are failed to meet requirement of
* placement policy
*/
public String getErrorDescription();
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
public class BlockPlacementStatusDefault implements BlockPlacementStatus {
private int requiredRacks = 0;
private int currentRacks = 0;
public BlockPlacementStatusDefault(int currentRacks, int requiredRacks){
this.requiredRacks = requiredRacks;
this.currentRacks = currentRacks;
}
@Override
public boolean isPlacementPolicySatisfied() {
return requiredRacks <= currentRacks;
}
@Override
public String getErrorDescription() {
if (isPlacementPolicySatisfied()) {
return null;
}
return "Block should be additionally replicated on " +
(requiredRacks - currentRacks) + " more rack(s).";
}
}

View File

@ -50,6 +50,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -374,9 +375,10 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
locs.length + " replica(s)."); locs.length + " replica(s).");
} }
// verify block placement policy // verify block placement policy
int missingRacks = BlockPlacementPolicy.getInstance(conf, null, networktopology). BlockPlacementStatus blockPlacementStatus =
verifyBlockPlacement(path, lBlk, Math.min(2,targetFileReplication)); BlockPlacementPolicy.getInstance(conf, null, networktopology).
if (missingRacks > 0) { verifyBlockPlacement(path, lBlk, targetFileReplication);
if (!blockPlacementStatus.isPlacementPolicySatisfied()) {
res.numMisReplicatedBlocks++; res.numMisReplicatedBlocks++;
misReplicatedPerFile++; misReplicatedPerFile++;
if (!showFiles) { if (!showFiles) {
@ -385,9 +387,7 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
out.print(path + ": "); out.print(path + ": ");
} }
out.println(" Replica placement policy is violated for " + out.println(" Replica placement policy is violated for " +
block + block + ". " + blockPlacementStatus.getErrorDescription());
". Block should be additionally replicated on " +
missingRacks + " more rack(s).");
} }
report.append(i + ". " + blkName + " len=" + block.getNumBytes()); report.append(i + ". " + blkName + " len=" + block.getNumBytes());
if (locs.length == 0) { if (locs.length == 0) {

View File

@ -83,7 +83,6 @@
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.mockito.Mockito;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
/** /**
@ -892,6 +891,80 @@ public void testFsckMissingReplicas() throws IOException {
} }
} }
} }
/**
* Tests that the # of misreplaced replicas is correct
* @throws IOException
*/
@Test
public void testFsckMisPlacedReplicas() throws IOException {
// Desired replication factor
final short REPL_FACTOR = 2;
// Number of replicas to actually start
short NUM_DN = 2;
// Number of blocks to write
final short NUM_BLOCKS = 3;
// Set a small-ish blocksize
final long blockSize = 512;
String [] racks = {"/rack1", "/rack1"};
String [] hosts = {"host1", "host2"};
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
MiniDFSCluster cluster = null;
DistributedFileSystem dfs = null;
try {
// Startup a minicluster
cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
.racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = (DistributedFileSystem) cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
// Create a file that will be intentionally under-replicated
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
long fileLen = blockSize * NUM_BLOCKS;
DFSTestUtil.createFile(dfs, path, fileLen, REPL_FACTOR, 1);
// Create an under-replicated file
NameNode namenode = cluster.getNameNode();
NetworkTopology nettop = cluster.getNamesystem().getBlockManager()
.getDatanodeManager().getNetworkTopology();
// Add a new node on different rack, so previous blocks' replicas
// are considered to be misplaced
nettop.add(DFSTestUtil.getDatanodeDescriptor("/rack2", "/host3"));
NUM_DN++;
Map<String,String[]> pmap = new HashMap<String, String[]>();
Writer result = new StringWriter();
PrintWriter out = new PrintWriter(result, true);
InetAddress remoteAddress = InetAddress.getLocalHost();
NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out,
NUM_DN, (short)REPL_FACTOR, remoteAddress);
// Run the fsck and check the Result
final HdfsFileStatus file =
namenode.getRpcServer().getFileInfo(pathString);
assertNotNull(file);
Result res = new Result(conf);
fsck.check(pathString, file, res);
// check misReplicatedBlock number.
assertEquals(res.numMisReplicatedBlocks, NUM_BLOCKS);
} finally {
if(dfs != null) {
dfs.close();
}
if(cluster != null) {
cluster.shutdown();
}
}
}
/** Test fsck with FileNotFound */ /** Test fsck with FileNotFound */
@Test @Test