diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c89a8dee74..f61df8df82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -233,6 +233,9 @@ Release 0.23.1 - UNRELEASED HDFS-2590. Fix the missing links in the WebHDFS forrest doc. (szetszwo) HDFS-2596. TestDirectoryScanner doesn't test parallel scans. (eli) + + HDFS-1765. Block Replication should respect under-replication + block priority. (Uma Maheswara Rao G via eli) Release 0.23.0 - 2011-11-01 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index c6675a2788..50619dff57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -168,9 +168,6 @@ public long getExcessBlocksCount() { /** variable to enable check for enough racks */ final boolean shouldCheckForEnoughRacks; - /** Last block index used for replication work. */ - private int replIndex = 0; - /** for block replicas placement */ private BlockPlacementPolicy blockplacement; @@ -923,74 +920,16 @@ int computeInvalidateWork(int nodesToProcess) { * @return number of blocks scheduled for replication during this iteration. */ private int computeReplicationWork(int blocksToProcess) throws IOException { - // Choose the blocks to be replicated - List> blocksToReplicate = - chooseUnderReplicatedBlocks(blocksToProcess); - - // replicate blocks - return computeReplicationWorkForBlocks(blocksToReplicate); - } - - /** - * Get a list of block lists to be replicated The index of block lists - * represents the - * - * @param blocksToProcess - * @return Return a list of block lists to be replicated. The block list index - * represents its replication priority. - */ - private List> chooseUnderReplicatedBlocks(int blocksToProcess) { - // initialize data structure for the return value - List> blocksToReplicate = new ArrayList>( - UnderReplicatedBlocks.LEVEL); - for (int i = 0; i < UnderReplicatedBlocks.LEVEL; i++) { - blocksToReplicate.add(new ArrayList()); - } + List> blocksToReplicate = null; namesystem.writeLock(); try { - synchronized (neededReplications) { - if (neededReplications.size() == 0) { - return blocksToReplicate; - } - - // Go through all blocks that need replications. - UnderReplicatedBlocks.BlockIterator neededReplicationsIterator = - neededReplications.iterator(); - // skip to the first unprocessed block, which is at replIndex - for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) { - neededReplicationsIterator.next(); - } - // # of blocks to process equals either twice the number of live - // data-nodes or the number of under-replicated blocks whichever is less - blocksToProcess = Math.min(blocksToProcess, neededReplications.size()); - - for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) { - if (!neededReplicationsIterator.hasNext()) { - // start from the beginning - replIndex = 0; - blocksToProcess = Math.min(blocksToProcess, neededReplications - .size()); - if (blkCnt >= blocksToProcess) - break; - neededReplicationsIterator = neededReplications.iterator(); - assert neededReplicationsIterator.hasNext() : "neededReplications should not be empty."; - } - - Block block = neededReplicationsIterator.next(); - int priority = neededReplicationsIterator.getPriority(); - if (priority < 0 || priority >= blocksToReplicate.size()) { - LOG.warn("Unexpected replication priority: " - + priority + " " + block); - } else { - blocksToReplicate.get(priority).add(block); - } - } // end for - } // end synchronized neededReplication + // Choose the blocks to be replicated + blocksToReplicate = neededReplications + .chooseUnderReplicatedBlocks(blocksToProcess); } finally { namesystem.writeUnlock(); } - - return blocksToReplicate; + return computeReplicationWorkForBlocks(blocksToReplicate); } /** Replicate a set of blocks @@ -1019,7 +958,7 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { // abandoned block or block reopened for append if(fileINode == null || fileINode.isUnderConstruction()) { neededReplications.remove(block, priority); // remove from neededReplications - replIndex--; + neededReplications.decrementReplicationIndex(priority); continue; } @@ -1043,7 +982,7 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { if ( (pendingReplications.getNumReplicas(block) > 0) || (blockHasEnoughRacks(block)) ) { neededReplications.remove(block, priority); // remove from neededReplications - replIndex--; + neededReplications.decrementReplicationIndex(priority); NameNode.stateChangeLog.info("BLOCK* " + "Removing block " + block + " from neededReplications as it has enough replicas."); @@ -1104,7 +1043,7 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { if(fileINode == null || fileINode.isUnderConstruction()) { neededReplications.remove(block, priority); // remove from neededReplications rw.targets = null; - replIndex--; + neededReplications.decrementReplicationIndex(priority); continue; } requiredReplication = fileINode.getReplication(); @@ -1118,7 +1057,7 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { if ( (pendingReplications.getNumReplicas(block) > 0) || (blockHasEnoughRacks(block)) ) { neededReplications.remove(block, priority); // remove from neededReplications - replIndex--; + neededReplications.decrementReplicationIndex(priority); rw.targets = null; NameNode.stateChangeLog.info("BLOCK* " + "Removing block " + block @@ -1156,7 +1095,7 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { // remove from neededReplications if(numEffectiveReplicas + targets.length >= requiredReplication) { neededReplications.remove(block, priority); // remove from neededReplications - replIndex--; + neededReplications.decrementReplicationIndex(priority); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java index 787dd2adca..81422cef07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java @@ -18,8 +18,11 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; + import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -81,10 +84,14 @@ class UnderReplicatedBlocks implements Iterable { private List> priorityQueues = new ArrayList>(); + /** Stores the replication index for each priority */ + private Map priorityToReplIdx = new HashMap(LEVEL); + /** Create an object. */ UnderReplicatedBlocks() { for (int i = 0; i < LEVEL; i++) { priorityQueues.add(new LightWeightLinkedSet()); + priorityToReplIdx.put(i, 0); } } @@ -300,6 +307,70 @@ synchronized void update(Block block, int curReplicas, } } } + + /** + * Get a list of block lists to be replicated. The index of block lists + * represents its replication priority. Replication index will be tracked for + * each priority list separately in priorityToReplIdx map. Iterates through + * all priority lists and find the elements after replication index. Once the + * last priority lists reaches to end, all replication indexes will be set to + * 0 and start from 1st priority list to fulfill the blockToProces count. + * + * @param blocksToProcess - number of blocks to fetch from underReplicated blocks. + * @return Return a list of block lists to be replicated. The block list index + * represents its replication priority. + */ + public synchronized List> chooseUnderReplicatedBlocks( + int blocksToProcess) { + // initialize data structure for the return value + List> blocksToReplicate = new ArrayList>(LEVEL); + for (int i = 0; i < LEVEL; i++) { + blocksToReplicate.add(new ArrayList()); + } + + if (size() == 0) { // There are no blocks to collect. + return blocksToReplicate; + } + + int blockCount = 0; + for (int priority = 0; priority < LEVEL; priority++) { + // Go through all blocks that need replications with current priority. + BlockIterator neededReplicationsIterator = iterator(priority); + Integer replIndex = priorityToReplIdx.get(priority); + + // skip to the first unprocessed block, which is at replIndex + for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) { + neededReplicationsIterator.next(); + } + + blocksToProcess = Math.min(blocksToProcess, size()); + + if (blockCount == blocksToProcess) { + break; // break if already expected blocks are obtained + } + + // Loop through all remaining blocks in the list. + while (blockCount < blocksToProcess + && neededReplicationsIterator.hasNext()) { + Block block = neededReplicationsIterator.next(); + blocksToReplicate.get(priority).add(block); + replIndex++; + blockCount++; + } + + if (!neededReplicationsIterator.hasNext() + && neededReplicationsIterator.getPriority() == LEVEL - 1) { + // reset all priorities replication index to 0 because there is no + // recently added blocks in any list. + for (int i = 0; i < LEVEL; i++) { + priorityToReplIdx.put(i, 0); + } + break; + } + priorityToReplIdx.put(priority, replIndex); + } + return blocksToReplicate; + } /** returns an iterator of all blocks in a given priority queue */ synchronized BlockIterator iterator(int level) { @@ -380,4 +451,14 @@ int getPriority() { return level; } } + + /** + * This method is to decrement the replication index for the given priority + * + * @param priority - int priority level + */ + public void decrementReplicationIndex(int priority) { + Integer replIdx = priorityToReplIdx.get(priority); + priorityToReplIdx.put(priority, --replIdx); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 794b23c652..f5926281ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -17,26 +17,32 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.junit.Assert.*; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; - -import junit.framework.TestCase; +import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; +import org.junit.Test; -public class TestReplicationPolicy extends TestCase { +public class TestReplicationPolicy { + private Random random= DFSUtil.getRandom(); private static final int BLOCK_SIZE = 1024; private static final int NUM_OF_DATANODES = 6; private static final Configuration CONF = new HdfsConfiguration(); @@ -90,6 +96,7 @@ public class TestReplicationPolicy extends TestCase { * the 1st is on dataNodes[0] and the 2nd is on a different rack. * @throws Exception */ + @Test public void testChooseTarget1() throws Exception { dataNodes[0].updateHeartbeat( 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, @@ -150,6 +157,7 @@ private static DatanodeDescriptor[] chooseTarget( * should be placed on a third rack. * @throws Exception */ + @Test public void testChooseTarget2() throws Exception { HashMap excludedNodes; DatanodeDescriptor[] targets; @@ -225,6 +233,7 @@ public void testChooseTarget2() throws Exception { * and the rest should be placed on the third rack. * @throws Exception */ + @Test public void testChooseTarget3() throws Exception { // make data node 0 to be not qualified to choose dataNodes[0].updateHeartbeat( @@ -278,6 +287,7 @@ public void testChooseTarget3() throws Exception { * the 3rd replica should be placed on the same rack as the 1st replica, * @throws Exception */ + @Test public void testChoooseTarget4() throws Exception { // make data node 0 & 1 to be not qualified to choose: not enough disk space for(int i=0; i<2; i++) { @@ -325,6 +335,7 @@ public void testChoooseTarget4() throws Exception { * the 3rd replica should be placed on the same rack as the 2nd replica, * @throws Exception */ + @Test public void testChooseTarget5() throws Exception { DatanodeDescriptor[] targets; targets = replicator.chooseTarget(filename, @@ -354,6 +365,7 @@ public void testChooseTarget5() throws Exception { * the 1st replica. The 3rd replica can be placed randomly. * @throws Exception */ + @Test public void testRereplicate1() throws Exception { List chosenNodes = new ArrayList(); chosenNodes.add(dataNodes[0]); @@ -388,6 +400,7 @@ public void testRereplicate1() throws Exception { * the rest replicas can be placed randomly, * @throws Exception */ + @Test public void testRereplicate2() throws Exception { List chosenNodes = new ArrayList(); chosenNodes.add(dataNodes[0]); @@ -417,6 +430,7 @@ public void testRereplicate2() throws Exception { * the rest replicas can be placed randomly, * @throws Exception */ + @Test public void testRereplicate3() throws Exception { List chosenNodes = new ArrayList(); chosenNodes.add(dataNodes[0]); @@ -450,4 +464,122 @@ public void testRereplicate3() throws Exception { assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0])); } + /** + * Test for the high priority blocks are processed before the low priority + * blocks. + */ + @Test(timeout = 60000) + public void testReplicationWithPriority() throws Exception { + int DFS_NAMENODE_REPLICATION_INTERVAL = 1000; + int HIGH_PRIORITY = 0; + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) + .format(true).build(); + try { + cluster.waitActive(); + final UnderReplicatedBlocks neededReplications = (UnderReplicatedBlocks) cluster + .getNameNode().getNamesystem().getBlockManager().neededReplications; + for (int i = 0; i < 100; i++) { + // Adding the blocks directly to normal priority + neededReplications.add(new Block(random.nextLong()), 2, 0, 3); + } + // Lets wait for the replication interval, to start process normal + // priority blocks + Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL); + + // Adding the block directly to high priority list + neededReplications.add(new Block(random.nextLong()), 1, 0, 3); + + // Lets wait for the replication interval + Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL); + + // Check replication completed successfully. Need not wait till it process + // all the 100 normal blocks. + assertFalse("Not able to clear the element from high priority list", + neededReplications.iterator(HIGH_PRIORITY).hasNext()); + } finally { + cluster.shutdown(); + } + } + + /** + * Test for the ChooseUnderReplicatedBlocks are processed based on priority + */ + @Test + public void testChooseUnderReplicatedBlocks() throws Exception { + UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks(); + + for (int i = 0; i < 5; i++) { + // Adding QUEUE_HIGHEST_PRIORITY block + underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3); + + // Adding QUEUE_VERY_UNDER_REPLICATED block + underReplicatedBlocks.add(new Block(random.nextLong()), 2, 0, 7); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(new Block(random.nextLong()), 6, 0, 6); + + // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block + underReplicatedBlocks.add(new Block(random.nextLong()), 5, 0, 6); + + // Adding QUEUE_WITH_CORRUPT_BLOCKS block + underReplicatedBlocks.add(new Block(random.nextLong()), 0, 0, 3); + } + + // Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks + // from + // QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_UNDER_REPLICATED. + List> chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(6); + assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0); + + // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 4 blocks from + // QUEUE_VERY_UNDER_REPLICATED, 5 blocks from QUEUE_UNDER_REPLICATED and 1 + // block from QUEUE_REPLICAS_BADLY_DISTRIBUTED. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10); + assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0); + + // Adding QUEUE_HIGHEST_PRIORITY + underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3); + + // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from + // QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED + // and 5 blocks from QUEUE_WITH_CORRUPT_BLOCKS. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4, 5); + + // Since it is reached to end of all lists, + // should start picking the blocks from start. + // Choose 7 blocks from UnderReplicatedBlocks. Then it should pick 6 blocks from + // QUEUE_HIGHEST_PRIORITY, 1 block from QUEUE_VERY_UNDER_REPLICATED. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(7); + assertTheChosenBlocks(chosenBlocks, 6, 1, 0, 0, 0); + } + + /** asserts the chosen blocks with expected priority blocks */ + private void assertTheChosenBlocks( + List> chosenBlocks, int firstPrioritySize, + int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize, + int fifthPrioritySize) { + assertEquals( + "Not returned the expected number of QUEUE_HIGHEST_PRIORITY blocks", + firstPrioritySize, chosenBlocks.get( + UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).size()); + assertEquals( + "Not returned the expected number of QUEUE_VERY_UNDER_REPLICATED blocks", + secondPrioritySize, chosenBlocks.get( + UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).size()); + assertEquals( + "Not returned the expected number of QUEUE_UNDER_REPLICATED blocks", + thirdPrioritySize, chosenBlocks.get( + UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).size()); + assertEquals( + "Not returned the expected number of QUEUE_REPLICAS_BADLY_DISTRIBUTED blocks", + fourthPrioritySize, chosenBlocks.get( + UnderReplicatedBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED).size()); + assertEquals( + "Not returned the expected number of QUEUE_WITH_CORRUPT_BLOCKS blocks", + fifthPrioritySize, chosenBlocks.get( + UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java index 3426a5ad1e..c8c528d0bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java @@ -145,9 +145,7 @@ public void testFileAdd() throws Exception { fs.delete(file, true); filesTotal--; // reduce the filecount for deleted file - // Wait for more than DATANODE_COUNT replication intervals to ensure all - // the blocks pending deletion are sent for deletion to the datanodes. - Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000); + waitForDeletion(); updateMetrics(); rb = getMetrics(NS_METRICS); assertGauge("FilesTotal", filesTotal, rb); @@ -176,7 +174,7 @@ public void testCorruptBlock() throws Exception { assertGauge("PendingReplicationBlocks", 1L, rb); assertGauge("ScheduledReplicationBlocks", 1L, rb); fs.delete(file, true); - updateMetrics(); + waitForDeletion(); rb = getMetrics(NS_METRICS); assertGauge("CorruptBlocks", 0L, rb); assertGauge("PendingReplicationBlocks", 0L, rb); @@ -212,9 +210,15 @@ public void testMissingBlock() throws Exception { assertGauge("UnderReplicatedBlocks", 1L, rb); assertGauge("MissingBlocks", 1L, rb); fs.delete(file, true); - updateMetrics(); + waitForDeletion(); assertGauge("UnderReplicatedBlocks", 0L, getMetrics(NS_METRICS)); } + + private void waitForDeletion() throws InterruptedException { + // Wait for more than DATANODE_COUNT replication intervals to ensure all + // the blocks pending deletion are sent for deletion to the datanodes. + Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000); + } public void testRenameMetrics() throws Exception { Path src = getTestPath("src");