From 6ef42873a02bfcbff5521869f4d6f66539d1db41 Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Tue, 12 Apr 2016 13:38:58 -0700 Subject: [PATCH] HDFS-9918. Erasure Coding: Sort located striped blocks based on decommissioned states. Contributed by Rakesh R. --- .../blockmanagement/DatanodeManager.java | 115 +++- .../hdfs/server/namenode/FSNamesystem.java | 22 +- .../hdfs/TestDecommissionWithStriped.java | 76 ++- .../TestSortLocatedStripedBlock.java | 557 ++++++++++++++++++ 4 files changed, 731 insertions(+), 39 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedStripedBlock.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index cd1bdaba69..da02a9035b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; @@ -47,6 +48,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.*; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; import java.io.IOException; @@ -368,49 +370,110 @@ private boolean isInactive(DatanodeInfo datanode) { } - /** Sort the located blocks by the distance to the target host. */ - public void sortLocatedBlocks(final String targethost, - final List locatedblocks) { - //sort the blocks + /** + * Sort the non-striped located blocks by the distance to the target host. + * + * For striped blocks, it will only move decommissioned/stale nodes to the + * bottom. For example, assume we have storage list: + * d0, d1, d2, d3, d4, d5, d6, d7, d8, d9 + * mapping to block indices: + * 0, 1, 2, 3, 4, 5, 6, 7, 8, 2 + * + * Here the internal block b2 is duplicated, locating in d2 and d9. If d2 is + * a decommissioning node then should switch d2 and d9 in the storage list. + * After sorting locations, will update corresponding block indices + * and block tokens. + */ + public void sortLocatedBlocks(final String targetHost, + final List locatedBlocks) { + Comparator comparator = avoidStaleDataNodesForRead ? + new DFSUtil.DecomStaleComparator(staleInterval) : + DFSUtil.DECOM_COMPARATOR; + // sort located block + for (LocatedBlock lb : locatedBlocks) { + if (lb.isStriped()) { + sortLocatedStripedBlock(lb, comparator); + } else { + sortLocatedBlock(lb, targetHost, comparator); + } + } + } + + /** + * Move decommissioned/stale datanodes to the bottom. After sorting it will + * update block indices and block tokens respectively. + * + * @param lb located striped block + * @param comparator dn comparator + */ + private void sortLocatedStripedBlock(final LocatedBlock lb, + Comparator comparator) { + DatanodeInfo[] di = lb.getLocations(); + HashMap locToIndex = new HashMap<>(); + HashMap> locToToken = + new HashMap<>(); + LocatedStripedBlock lsb = (LocatedStripedBlock) lb; + for (int i = 0; i < di.length; i++) { + locToIndex.put(di[i], lsb.getBlockIndices()[i]); + locToToken.put(di[i], lsb.getBlockTokens()[i]); + } + // Move decommissioned/stale datanodes to the bottom + Arrays.sort(di, comparator); + + // must update cache since we modified locations array + lb.updateCachedStorageInfo(); + + // must update block indices and block tokens respectively + for (int i = 0; i < di.length; i++) { + lsb.getBlockIndices()[i] = locToIndex.get(di[i]); + lsb.getBlockTokens()[i] = locToToken.get(di[i]); + } + } + + /** + * Move decommissioned/stale datanodes to the bottom. Also, sort nodes by + * network distance. + * + * @param lb located block + * @param targetHost target host + * @param comparator dn comparator + */ + private void sortLocatedBlock(final LocatedBlock lb, String targetHost, + Comparator comparator) { // As it is possible for the separation of node manager and datanode, // here we should get node but not datanode only . - Node client = getDatanodeByHost(targethost); + Node client = getDatanodeByHost(targetHost); if (client == null) { List hosts = new ArrayList<> (1); - hosts.add(targethost); + hosts.add(targetHost); List resolvedHosts = dnsToSwitchMapping.resolve(hosts); if (resolvedHosts != null && !resolvedHosts.isEmpty()) { String rName = resolvedHosts.get(0); if (rName != null) { client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + - targethost); + targetHost); } } else { LOG.error("Node Resolution failed. Please make sure that rack " + "awareness scripts are functional."); } } - - Comparator comparator = avoidStaleDataNodesForRead ? - new DFSUtil.DecomStaleComparator(staleInterval) : - DFSUtil.DECOM_COMPARATOR; - - for (LocatedBlock b : locatedblocks) { - DatanodeInfo[] di = b.getLocations(); - // Move decommissioned/stale datanodes to the bottom - Arrays.sort(di, comparator); - - int lastActiveIndex = di.length - 1; - while (lastActiveIndex > 0 && isInactive(di[lastActiveIndex])) { - --lastActiveIndex; - } - int activeLen = lastActiveIndex + 1; - networktopology.sortByDistance(client, b.getLocations(), activeLen); - // must update cache since we modified locations array - b.updateCachedStorageInfo(); + + DatanodeInfo[] di = lb.getLocations(); + // Move decommissioned/stale datanodes to the bottom + Arrays.sort(di, comparator); + + // Sort nodes by network distance only for located blocks + int lastActiveIndex = di.length - 1; + while (lastActiveIndex > 0 && isInactive(di[lastActiveIndex])) { + --lastActiveIndex; } + int activeLen = lastActiveIndex + 1; + networktopology.sortByDistance(client, lb.getLocations(), activeLen); + + // must update cache since we modified locations array + lb.updateCachedStorageInfo(); } - /** @return the datanode descriptor for the host. */ public DatanodeDescriptor getDatanodeByHost(final String host) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 00e12414cb..39b3598063 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -184,7 +184,6 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; @@ -1779,25 +1778,28 @@ LocatedBlocks getBlockLocations(String clientMachine, String srcArg, } LocatedBlocks blocks = res.blocks; + sortLocatedBlocks(clientMachine, blocks); + return blocks; + } + + private void sortLocatedBlocks(String clientMachine, LocatedBlocks blocks) { if (blocks != null) { List blkList = blocks.getLocatedBlocks(); - if (blkList == null || blkList.size() == 0 || - blkList.get(0) instanceof LocatedStripedBlock) { - // no need to sort locations for striped blocks - return blocks; + if (blkList == null || blkList.size() == 0) { + // simply return, block list is empty + return; } - blockManager.getDatanodeManager().sortLocatedBlocks( - clientMachine, blkList); + blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine, + blkList); // lastBlock is not part of getLocatedBlocks(), might need to sort it too LocatedBlock lastBlock = blocks.getLastLocatedBlock(); if (lastBlock != null) { ArrayList lastBlockList = Lists.newArrayList(lastBlock); - blockManager.getDatanodeManager().sortLocatedBlocks( - clientMachine, lastBlockList); + blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine, + lastBlockList); } } - return blocks; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java index bde2cebbd0..c0d8268aa5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -49,12 +50,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.PathUtils; import org.junit.After; import org.junit.Assert; @@ -159,6 +163,13 @@ public void testFileFullBlockGroup() throws Exception { testDecommission(blockSize * dataBlocks, 9, 1, "testFileFullBlockGroup"); } + @Test(timeout = 120000) + public void testFileMultipleBlockGroups() throws Exception { + LOG.info("Starting test testFileMultipleBlockGroups"); + int writeBytes = 2 * blockSize * dataBlocks; + testDecommission(writeBytes, 9, 1, "testFileMultipleBlockGroups"); + } + @Test(timeout = 120000) public void testFileSmallerThanOneCell() throws Exception { LOG.info("Starting test testFileSmallerThanOneCell"); @@ -274,7 +285,15 @@ private void testDecommission(int writeBytes, int storageCount, int deadDecomissioned = fsn.getNumDecomDeadDataNodes(); int liveDecomissioned = fsn.getNumDecomLiveDataNodes(); - ((HdfsDataInputStream) dfs.open(ecFile)).getAllBlocks(); + List lbs = ((HdfsDataInputStream) dfs.open(ecFile)) + .getAllBlocks(); + + // prepare expected block index and token list. + List> locToIndexList = new ArrayList<>(); + List>> locToTokenList = + new ArrayList<>(); + prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList); + // Decommission node. Verify that node is decommissioned. decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED); @@ -290,9 +309,55 @@ private void testDecommission(int writeBytes, int storageCount, assertNull(checkFile(dfs, ecFile, storageCount, decommisionNodes, numDNs)); StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommisionNodes, null); + + assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList); + cleanupFile(dfs, ecFile); } + private void prepareBlockIndexAndTokenList(List lbs, + List> locToIndexList, + List>> locToTokenList) { + for (LocatedBlock lb : lbs) { + HashMap locToIndex = new HashMap(); + locToIndexList.add(locToIndex); + + HashMap> locToToken = + new HashMap>(); + locToTokenList.add(locToToken); + + DatanodeInfo[] di = lb.getLocations(); + LocatedStripedBlock stripedBlk = (LocatedStripedBlock) lb; + for (int i = 0; i < di.length; i++) { + locToIndex.put(di[i], stripedBlk.getBlockIndices()[i]); + locToToken.put(di[i], stripedBlk.getBlockTokens()[i]); + } + } + } + + /** + * Verify block index and token values. Must update block indices and block + * tokens after sorting. + */ + private void assertBlockIndexAndTokenPosition(List lbs, + List> locToIndexList, + List>> locToTokenList) { + for (int i = 0; i < lbs.size(); i++) { + LocatedBlock lb = lbs.get(i); + LocatedStripedBlock stripedBlk = (LocatedStripedBlock) lb; + HashMap locToIndex = locToIndexList.get(i); + HashMap> locToToken = + locToTokenList.get(i); + DatanodeInfo[] di = lb.getLocations(); + for (int j = 0; j < di.length; j++) { + Assert.assertEquals("Block index value mismatches after sorting", + (byte) locToIndex.get(di[j]), stripedBlk.getBlockIndices()[j]); + Assert.assertEquals("Block token value mismatches after sorting", + locToToken.get(di[j]), stripedBlk.getBlockTokens()[j]); + } + } + } + private List getDecommissionDatanode(DistributedFileSystem dfs, Path ecFile, int writeBytes, int decomNodeCount) throws IOException { ArrayList decommissionedNodes = new ArrayList<>(); @@ -447,7 +512,12 @@ private static String checkFile(FileSystem fileSys, Path name, int repl, return "For block " + blk.getBlock() + " replica on " + nodes[j] + " is given as downnode, " + "but is not decommissioned"; } - // TODO: Add check to verify that the Decommissioned node (if any) + // Decommissioned node (if any) should only be last node in list. + if (j < repl) { + return "For block " + blk.getBlock() + " decommissioned node " + + nodes[j] + " was not last node in list: " + (j + 1) + " of " + + nodes.length; + } // should only be last node in list. LOG.info("Block " + blk.getBlock() + " replica on " + nodes[j] + " is decommissioned."); @@ -470,4 +540,4 @@ private static String checkFile(FileSystem fileSys, Path name, int repl, } return null; } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedStripedBlock.java new file mode 100644 index 0000000000..1dd067dd6e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedStripedBlock.java @@ -0,0 +1,557 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.StripedFileTestUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Time; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class tests the sorting of located striped blocks based on + * decommissioned states. + */ +public class TestSortLocatedStripedBlock { + static final Logger LOG = LoggerFactory + .getLogger(TestSortLocatedStripedBlock.class); + static final int BLK_GROUP_WIDTH = StripedFileTestUtil.NUM_DATA_BLOCKS + + StripedFileTestUtil.NUM_PARITY_BLOCKS; + static final int NUM_DATA_BLOCKS = StripedFileTestUtil.NUM_DATA_BLOCKS; + static final int NUM_PARITY_BLOCKS = StripedFileTestUtil.NUM_PARITY_BLOCKS; + static DatanodeManager dm; + static final long STALE_INTERVAL = 30 * 1000 * 60; + + @BeforeClass + public static void setup() throws IOException { + dm = mockDatanodeManager(); + } + + /** + * Test to verify sorting with multiple decommissioned datanodes exists in + * storage lists. + * + * We have storage list, marked decommissioned internal blocks with a ' + * d0, d1, d2, d3, d4, d5, d6, d7, d8, d9, d10, d11, d12 + * mapping to indices + * 0', 1', 2, 3, 4, 5, 6, 7', 8', 0, 1, 7, 8 + * + * Decommissioned node indices: 0, 1, 7, 8 + * + * So in the original list nodes d0, d1, d7, d8 are decommissioned state. + * + * After sorting the expected block indices list should be, + * 0, 1, 2, 3, 4, 5, 6, 7, 8, 0', 1', 7', 8' + * + * After sorting the expected storage list will be, + * d9, d10, d2, d3, d4, d5, d6, d11, d12, d0, d1, d7, d8. + * + * Note: after sorting block indices will not be in ascending order. + */ + @Test(timeout = 10000) + public void testWithMultipleDecommnDatanodes() { + LOG.info("Starting test testSortWithMultipleDecommnDatanodes"); + int lbsCount = 2; // two located block groups + List decommnNodeIndices = new ArrayList<>(); + decommnNodeIndices.add(0); + decommnNodeIndices.add(1); + decommnNodeIndices.add(7); + decommnNodeIndices.add(8); + List targetNodeIndices = new ArrayList<>(); + targetNodeIndices.addAll(decommnNodeIndices); + // map contains decommissioned node details in each located strip block + // which will be used for assertions + HashMap> decommissionedNodes = new HashMap<>( + lbsCount * decommnNodeIndices.size()); + List lbs = createLocatedStripedBlocks(lbsCount, + NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS, decommnNodeIndices, + targetNodeIndices, decommissionedNodes); + + // prepare expected block index and token list. + List> locToIndexList = new ArrayList<>(); + List>> locToTokenList = + new ArrayList<>(); + prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList); + + dm.sortLocatedBlocks(null, lbs); + + assertDecommnNodePosition(BLK_GROUP_WIDTH, decommissionedNodes, lbs); + assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList); + } + + /** + * Test to verify sorting with two decommissioned datanodes exists in + * storage lists for the same block index. + * + * We have storage list, marked decommissioned internal blocks with a ' + * d0, d1, d2, d3, d4, d5, d6, d7, d8, d9, d10, d11, d12, d13 + * mapping to indices + * 0', 1', 2, 3, 4', 5', 6, 7, 8, 0, 1', 4, 5, 1 + * + * Decommissioned node indices: 0', 1', 4', 5', 1' + * + * Here decommissioned has done twice to the datanode block index 1. + * So in the original list nodes d0, d1, d4, d5, d10 are decommissioned state. + * + * After sorting the expected block indices list will be, + * 0, 1, 2, 3, 4, 5, 6, 7, 8, 0', 1', 1', 4', 5' + * + * After sorting the expected storage list will be, + * d9, d13, d2, d3, d11, d12, d6, d7, d8, d0, d1, d10, d4, d5. + * + * Note: after sorting block indices will not be in ascending order. + */ + @Test(timeout = 10000) + public void testTwoDatanodesWithSameBlockIndexAreDecommn() { + LOG.info("Starting test testTwoDatanodesWithSameBlockIndexAreDecommn"); + int lbsCount = 2; // two located block groups + List decommnNodeIndices = new ArrayList<>(); + decommnNodeIndices.add(0); + decommnNodeIndices.add(1); + decommnNodeIndices.add(4); + decommnNodeIndices.add(5); + // representing blockIndex 1, later this also decommissioned + decommnNodeIndices.add(1); + + List targetNodeIndices = new ArrayList<>(); + targetNodeIndices.addAll(decommnNodeIndices); + // map contains decommissioned node details in each located strip block + // which will be used for assertions + HashMap> decommissionedNodes = new HashMap<>( + lbsCount * decommnNodeIndices.size()); + List lbs = createLocatedStripedBlocks(lbsCount, + NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS, decommnNodeIndices, + targetNodeIndices, decommissionedNodes); + + // prepare expected block index and token list. + List> locToIndexList = new ArrayList<>(); + List>> locToTokenList = + new ArrayList<>(); + prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList); + + dm.sortLocatedBlocks(null, lbs); + assertDecommnNodePosition(BLK_GROUP_WIDTH, decommissionedNodes, lbs); + assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList); + } + + /** + * Test to verify sorting with decommissioned datanodes exists in storage + * list which is smaller than stripe size. + * + * We have storage list, marked decommissioned internal blocks with a ' + * d0, d1, d2, d3, d6, d7, d8, d9, d10, d11 + * mapping to indices + * 0', 1, 2', 3, 6, 7, 8, 0, 2', 2 + * + * Decommissioned node indices: 0', 2', 2' + * + * Here decommissioned has done twice to the datanode block index 2. + * So in the original list nodes d0, d2, d10 are decommissioned state. + * + * After sorting the expected block indices list should be, + * 0, 1, 2, 3, 6, 7, 8, 0', 2', 2' + * + * After sorting the expected storage list will be, + * d9, d1, d11, d3, d6, d7, d8, d0, d2, d10. + * + * Note: after sorting block indices will not be in ascending order. + */ + @Test(timeout = 10000) + public void testSmallerThanOneStripeWithMultpleDecommnNodes() + throws Exception { + LOG.info("Starting test testSmallerThanOneStripeWithDecommn"); + int lbsCount = 2; // two located block groups + List decommnNodeIndices = new ArrayList<>(); + decommnNodeIndices.add(0); + decommnNodeIndices.add(2); + // representing blockIndex 1, later this also decommissioned + decommnNodeIndices.add(2); + + List targetNodeIndices = new ArrayList<>(); + targetNodeIndices.addAll(decommnNodeIndices); + // map contains decommissioned node details in each located strip block + // which will be used for assertions + HashMap> decommissionedNodes = new HashMap<>( + lbsCount * decommnNodeIndices.size()); + int dataBlksNum = NUM_DATA_BLOCKS - 2; + List lbs = createLocatedStripedBlocks(lbsCount, dataBlksNum, + NUM_PARITY_BLOCKS, decommnNodeIndices, targetNodeIndices, + decommissionedNodes); + + // prepare expected block index and token list. + List> locToIndexList = new ArrayList<>(); + List>> locToTokenList = + new ArrayList<>(); + prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList); + + dm.sortLocatedBlocks(null, lbs); + + // After this index all are decommissioned nodes. + int blkGrpWidth = dataBlksNum + NUM_PARITY_BLOCKS; + assertDecommnNodePosition(blkGrpWidth, decommissionedNodes, lbs); + assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList); + } + + /** + * Test to verify sorting with decommissioned datanodes exists in storage + * list but the corresponding new target datanode doesn't exists. + * + * We have storage list, marked decommissioned internal blocks with a ' + * d0, d1, d2, d3, d4, d5, d6, d7, d8, d9, d10, d11 + * mapping to indices + * 0', 1', 2', 3, 4', 5', 6, 7, 8, 0, 2, 4 + * + * Decommissioned node indices: 0', 1', 2', 4', 5' + * + * 1 and 5 nodes doesn't exists in the target list. This can happen, the + * target node block corrupted or lost after the successful decommissioning. + * So in the original list nodes corresponding to the decommissioned block + * index 1 and 5 doesn't have any target entries. + * + * After sorting the expected block indices list should be, + * 0, 2, 3, 4, 6, 7, 8, 0', 1', 2', 4', 5' + * + * After sorting the expected storage list will be, + * d9, d10, d3, d11, d6, d7, d8, d0, d1, d2, d4, d5. + * + * Note: after sorting block indices will not be in ascending order. + */ + @Test(timeout = 10000) + public void testTargetDecommnDatanodeDoesntExists() { + LOG.info("Starting test testTargetDecommnDatanodeDoesntExists"); + int lbsCount = 2; // two located block groups + List decommnNodeIndices = new ArrayList<>(); + decommnNodeIndices.add(0); + decommnNodeIndices.add(1); + decommnNodeIndices.add(2); + decommnNodeIndices.add(4); + decommnNodeIndices.add(5); + + List targetNodeIndices = new ArrayList<>(); + targetNodeIndices.add(0); + targetNodeIndices.add(2); + targetNodeIndices.add(4); + // 1 and 5 nodes doesn't exists in the target list. One such case is, the + // target node block corrupted or lost after the successful decommissioning + + // map contains decommissioned node details in each located strip block + // which will be used for assertions + HashMap> decommissionedNodes = new HashMap<>( + lbsCount * decommnNodeIndices.size()); + List lbs = createLocatedStripedBlocks(lbsCount, + NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS, decommnNodeIndices, + targetNodeIndices, decommissionedNodes); + + // prepare expected block index and token list. + List> locToIndexList = new ArrayList<>(); + List>> locToTokenList = + new ArrayList<>(); + prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList); + + dm.sortLocatedBlocks(null, lbs); + + // After this index all are decommissioned nodes. Needs to reconstruct two + // more block indices. + int blkGrpWidth = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS - 2; + assertDecommnNodePosition(blkGrpWidth, decommissionedNodes, lbs); + assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList); + } + + /** + * Test to verify sorting with multiple in-service and decommissioned + * datanodes exists in storage lists. + * + * We have storage list, marked decommissioned internal blocks with a ' + * d0, d1, d2, d3, d4, d5, d6, d7, d8, d9, d10, d11, d12, d13 + * mapping to indices + * 0', 1', 2, 3, 4, 5, 6, 7', 8', 0, 1, 7, 8, 1 + * + * Decommissioned node indices: 0', 1', 7', 8' + * + * Additional In-Service node d13 at the end, block index: 1 + * + * So in the original list nodes d0, d1, d7, d8 are decommissioned state. + * + * After sorting the expected block indices list will be, + * 0, 1, 2, 3, 4, 5, 6, 7, 8, 1, 0', 1', 7', 8' + * + * After sorting the expected storage list will be, + * d9, d10, d2, d3, d4, d5, d6, d11, d12, d13, d0, d1, d7, d8. + * + * Note: after sorting block indices will not be in ascending order. + */ + @Test(timeout = 10000) + public void testWithMultipleInServiceAndDecommnDatanodes() { + LOG.info("Starting test testWithMultipleInServiceAndDecommnDatanodes"); + int lbsCount = 2; // two located block groups + List decommnNodeIndices = new ArrayList<>(); + decommnNodeIndices.add(0); + decommnNodeIndices.add(1); + decommnNodeIndices.add(7); + decommnNodeIndices.add(8); + List targetNodeIndices = new ArrayList<>(); + targetNodeIndices.addAll(decommnNodeIndices); + + // at the end add an additional In-Service node to blockIndex=1 + targetNodeIndices.add(1); + + // map contains decommissioned node details in each located strip block + // which will be used for assertions + HashMap> decommissionedNodes = new HashMap<>( + lbsCount * decommnNodeIndices.size()); + List lbs = createLocatedStripedBlocks(lbsCount, + NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS, decommnNodeIndices, + targetNodeIndices, decommissionedNodes); + List staleDns = new ArrayList<>(); + for (LocatedBlock lb : lbs) { + DatanodeInfo[] locations = lb.getLocations(); + DatanodeInfo staleDn = locations[locations.length - 1]; + staleDn + .setLastUpdateMonotonic(Time.monotonicNow() - (STALE_INTERVAL * 2)); + staleDns.add(staleDn); + } + + // prepare expected block index and token list. + List> locToIndexList = new ArrayList<>(); + List>> locToTokenList = + new ArrayList<>(); + prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList); + + dm.sortLocatedBlocks(null, lbs); + + assertDecommnNodePosition(BLK_GROUP_WIDTH + 1, decommissionedNodes, lbs); + assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList); + + for (LocatedBlock lb : lbs) { + byte[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices(); + // after sorting stale block index will be placed after normal nodes. + Assert.assertEquals("Failed to move stale node to bottom!", 1, + blockIndices[9]); + DatanodeInfo[] locations = lb.getLocations(); + // After sorting stale node d13 will be placed after normal nodes + Assert.assertEquals("Failed to move stale dn after normal one!", + staleDns.remove(0), locations[9]); + } + } + + /** + * Verify that decommissioned/stale nodes must be positioned after normal + * nodes. + */ + private void assertDecommnNodePosition(int blkGrpWidth, + HashMap> decommissionedNodes, + List lbs) { + for (int i = 0; i < lbs.size(); i++) { // for each block + LocatedBlock blk = lbs.get(i); + DatanodeInfo[] nodes = blk.getLocations(); + List decommissionedNodeList = decommissionedNodes.get(i); + + for (int j = 0; j < nodes.length; j++) { // for each replica + DatanodeInfo dnInfo = nodes[j]; + LOG.info("Block Locations size={}, locs={}, j=", nodes.length, + dnInfo.toString(), j); + if (j < blkGrpWidth) { + Assert.assertEquals("Node shouldn't be decommissioned", + AdminStates.NORMAL, dnInfo.getAdminState()); + } else { + // check against decommissioned list + Assert.assertTrue( + "For block " + blk.getBlock() + " decommissioned node " + dnInfo + + " is not last node in list: " + j + "th index of " + + nodes.length, + decommissionedNodeList.contains(dnInfo.getXferAddr())); + Assert.assertEquals("Node should be decommissioned", + AdminStates.DECOMMISSIONED, dnInfo.getAdminState()); + } + } + } + } + + private List createLocatedStripedBlocks(int blkGrpCount, + int dataNumBlk, int numParityBlk, List decommnNodeIndices, + List targetNodeIndices, + HashMap> decommissionedNodes) { + + final List lbs = new ArrayList<>(blkGrpCount); + for (int i = 0; i < blkGrpCount; i++) { + ArrayList decommNodeInfo = new ArrayList(); + decommissionedNodes.put(new Integer(i), decommNodeInfo); + List dummyDecommnNodeIndices = new ArrayList<>(); + dummyDecommnNodeIndices.addAll(decommnNodeIndices); + + LocatedStripedBlock lsb = createEachLocatedBlock(dataNumBlk, numParityBlk, + dummyDecommnNodeIndices, targetNodeIndices, decommNodeInfo); + lbs.add(lsb); + } + return lbs; + } + + private LocatedStripedBlock createEachLocatedBlock(int numDataBlk, + int numParityBlk, List decommnNodeIndices, + List targetNodeIndices, ArrayList decommNodeInfo) { + final long blockGroupID = Long.MIN_VALUE; + int totalDns = numDataBlk + numParityBlk + targetNodeIndices.size(); + DatanodeInfo[] locs = new DatanodeInfo[totalDns]; + String[] storageIDs = new String[totalDns]; + StorageType[] storageTypes = new StorageType[totalDns]; + byte[] blkIndices = new byte[totalDns]; + + // Adding data blocks + int index = 0; + for (; index < numDataBlk; index++) { + blkIndices[index] = (byte) index; + // Location port always equal to logical index of a block, + // for easier verification + locs[index] = DFSTestUtil.getLocalDatanodeInfo(blkIndices[index]); + locs[index].setLastUpdateMonotonic(Time.monotonicNow()); + storageIDs[index] = locs[index].getDatanodeUuid(); + storageTypes[index] = StorageType.DISK; + // set decommissioned state + if (decommnNodeIndices.contains(index)) { + locs[index].setDecommissioned(); + decommNodeInfo.add(locs[index].toString()); + // Removing it from the list to ensure that all the given nodes are + // successfully marked as decomissioned. + decommnNodeIndices.remove(new Integer(index)); + } + } + // Adding parity blocks after data blocks + index = NUM_DATA_BLOCKS; + for (int j = numDataBlk; j < numDataBlk + numParityBlk; j++, index++) { + blkIndices[j] = (byte) index; + // Location port always equal to logical index of a block, + // for easier verification + locs[j] = DFSTestUtil.getLocalDatanodeInfo(blkIndices[j]); + locs[j].setLastUpdateMonotonic(Time.monotonicNow()); + storageIDs[j] = locs[j].getDatanodeUuid(); + storageTypes[j] = StorageType.DISK; + // set decommissioned state + if (decommnNodeIndices.contains(index)) { + locs[j].setDecommissioned(); + decommNodeInfo.add(locs[j].toString()); + // Removing it from the list to ensure that all the given nodes are + // successfully marked as decomissioned. + decommnNodeIndices.remove(new Integer(index)); + } + } + // Add extra target nodes to storage list after the parity blocks + int basePortValue = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; + index = numDataBlk + numParityBlk; + for (int i = 0; i < targetNodeIndices.size(); i++, index++) { + int blkIndexPos = targetNodeIndices.get(i); + blkIndices[index] = (byte) blkIndexPos; + // Location port always equal to logical index of a block, + // for easier verification + locs[index] = DFSTestUtil.getLocalDatanodeInfo(basePortValue++); + locs[index].setLastUpdateMonotonic(Time.monotonicNow()); + storageIDs[index] = locs[index].getDatanodeUuid(); + storageTypes[index] = StorageType.DISK; + // set decommissioned state. This can happen, the target node is again + // decommissioned by administrator + if (decommnNodeIndices.contains(blkIndexPos)) { + locs[index].setDecommissioned(); + decommNodeInfo.add(locs[index].toString()); + // Removing it from the list to ensure that all the given nodes are + // successfully marked as decomissioned. + decommnNodeIndices.remove(new Integer(blkIndexPos)); + } + } + return new LocatedStripedBlock( + new ExtendedBlock("pool", blockGroupID, + StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE, 1001), + locs, storageIDs, storageTypes, blkIndices, 0, false, null); + } + + private static DatanodeManager mockDatanodeManager() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, + STALE_INTERVAL); + FSNamesystem fsn = Mockito.mock(FSNamesystem.class); + BlockManager bm = Mockito.mock(BlockManager.class); + BlockReportLeaseManager blm = new BlockReportLeaseManager(conf); + Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm); + DatanodeManager dm = new DatanodeManager(bm, fsn, conf); + return dm; + } + + private void prepareBlockIndexAndTokenList(List lbs, + List> locToIndexList, + List>> locToTokenList) { + for (LocatedBlock lb : lbs) { + HashMap locToIndex = new HashMap(); + locToIndexList.add(locToIndex); + + HashMap> locToToken = + new HashMap>(); + locToTokenList.add(locToToken); + + DatanodeInfo[] di = lb.getLocations(); + LocatedStripedBlock stripedBlk = (LocatedStripedBlock) lb; + for (int i = 0; i < di.length; i++) { + locToIndex.put(di[i], stripedBlk.getBlockIndices()[i]); + locToToken.put(di[i], stripedBlk.getBlockTokens()[i]); + } + } + } + + /** + * Verify block index and token values. Must update block indices and block + * tokens after sorting. + */ + private void assertBlockIndexAndTokenPosition(List lbs, + List> locToIndexList, + List>> locToTokenList) { + for (int i = 0; i < lbs.size(); i++) { + LocatedBlock lb = lbs.get(i); + LocatedStripedBlock stripedBlk = (LocatedStripedBlock) lb; + HashMap locToIndex = locToIndexList.get(i); + HashMap> locToToken = + locToTokenList.get(i); + DatanodeInfo[] di = lb.getLocations(); + for (int j = 0; j < di.length; j++) { + Assert.assertEquals("Block index value mismatches after sorting", + (byte) locToIndex.get(di[j]), stripedBlk.getBlockIndices()[j]); + Assert.assertEquals("Block token value mismatches after sorting", + locToToken.get(di[j]), stripedBlk.getBlockTokens()[j]); + } + } + } +} \ No newline at end of file