diff --git a/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java b/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java index 13fa1c447e..07331ef8d8 100644 --- a/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java +++ b/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java @@ -48,467 +48,470 @@ import org.junit.Test; public class TestBlockPlacementPolicyRaid { - private Configuration conf = null; - private MiniDFSCluster cluster = null; - private FSNamesystem namesystem = null; - private BlockPlacementPolicyRaid policy = null; - private FileSystem fs = null; - String[] rack1 = {"/rack1"}; - String[] rack2 = {"/rack2"}; - String[] host1 = {"host1.rack1.com"}; - String[] host2 = {"host2.rack2.com"}; - String xorPrefix = null; - String raidTempPrefix = null; - String raidrsTempPrefix = null; - String raidrsHarTempPrefix = null; - - final static Log LOG = - LogFactory.getLog(TestBlockPlacementPolicyRaid.class); - - protected void setupCluster() throws IOException { - conf = new Configuration(); - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); - conf.set("dfs.replication.pending.timeout.sec", "2"); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L); - conf.set("dfs.block.replicator.classname", - "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid"); - conf.set(RaidNode.STRIPE_LENGTH_KEY, "2"); - conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3"); - conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); - // start the cluster with one datanode first - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1). - format(true).racks(rack1).hosts(host1).build(); - cluster.waitActive(); - namesystem = cluster.getNameNode().getNamesystem(); - Assert.assertTrue("BlockPlacementPolicy type is not correct.", - namesystem.blockManager.replicator instanceof BlockPlacementPolicyRaid); - policy = (BlockPlacementPolicyRaid) namesystem.blockManager.replicator; - fs = cluster.getFileSystem(); - xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath(); - raidTempPrefix = RaidNode.xorTempPrefix(conf); - raidrsTempPrefix = RaidNode.rsTempPrefix(conf); - raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf); - } - - /** - * Test that the parity files will be placed at the good locations when we - * create them. - */ @Test - public void testChooseTargetForRaidFile() throws IOException { - setupCluster(); - try { - String src = "/dir/file"; - String parity = raidrsTempPrefix + src; - DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L); - DFSTestUtil.waitReplication(fs, new Path(src), (short)1); - refreshPolicy(); - setBlockPlacementPolicy(namesystem, policy); - // start 3 more datanodes - String[] racks = {"/rack2", "/rack2", "/rack2", - "/rack2", "/rack2", "/rack2"}; - String[] hosts = - {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com", - "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"}; - cluster.startDataNodes(conf, 6, true, null, racks, hosts, null); - int numBlocks = 6; - DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L); - DFSTestUtil.waitReplication(fs, new Path(parity), (short)2); - FileStatus srcStat = fs.getFileStatus(new Path(src)); - BlockLocation[] srcLoc = - fs.getFileBlockLocations(srcStat, 0, srcStat.getLen()); - FileStatus parityStat = fs.getFileStatus(new Path(parity)); - BlockLocation[] parityLoc = - fs.getFileBlockLocations(parityStat, 0, parityStat.getLen()); - int parityLen = RaidNode.rsParityLength(conf); - for (int i = 0; i < numBlocks / parityLen; i++) { - Set locations = new HashSet(); - for (int j = 0; j < srcLoc.length; j++) { - String [] names = srcLoc[j].getNames(); - for (int k = 0; k < names.length; k++) { - LOG.info("Source block location: " + names[k]); - locations.add(names[k]); - } - } - for (int j = 0 ; j < parityLen; j++) { - String[] names = parityLoc[j + i * parityLen].getNames(); - for (int k = 0; k < names.length; k++) { - LOG.info("Parity block location: " + names[k]); - Assert.assertTrue(locations.add(names[k])); - } - } - } - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - /** - * Test that the har parity files will be placed at the good locations when we - * create them. - */ - @Test - public void testChooseTargetForHarRaidFile() throws IOException { - setupCluster(); - try { - String[] racks = {"/rack2", "/rack2", "/rack2", - "/rack2", "/rack2", "/rack2"}; - String[] hosts = - {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com", - "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"}; - cluster.startDataNodes(conf, 6, true, null, racks, hosts, null); - String harParity = raidrsHarTempPrefix + "/dir/file"; - int numBlocks = 11; - DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L); - DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1); - FileStatus stat = fs.getFileStatus(new Path(harParity)); - BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen()); - int rsParityLength = RaidNode.rsParityLength(conf); - for (int i = 0; i < numBlocks - rsParityLength; i++) { - Set locations = new HashSet(); - for (int j = 0; j < rsParityLength; j++) { - for (int k = 0; k < loc[i + j].getNames().length; k++) { - // verify that every adjacent 4 blocks are on differnt nodes - String name = loc[i + j].getNames()[k]; - LOG.info("Har Raid block location: " + name); - Assert.assertTrue(locations.add(name)); - } - } - } - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - /** - * Test BlockPlacementPolicyRaid.CachedLocatedBlocks - * Verify that the results obtained from cache is the same as - * the results obtained directly - */ - @Test - public void testCachedBlocks() throws IOException { - setupCluster(); - try { - String file1 = "/dir/file1"; - String file2 = "/dir/file2"; - DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); - DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); - // test blocks cache - CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem); - verifyCachedBlocksResult(cachedBlocks, namesystem, file1); - verifyCachedBlocksResult(cachedBlocks, namesystem, file1); - verifyCachedBlocksResult(cachedBlocks, namesystem, file2); - verifyCachedBlocksResult(cachedBlocks, namesystem, file2); - try { - Thread.sleep(1200L); - } catch (InterruptedException e) { - } - verifyCachedBlocksResult(cachedBlocks, namesystem, file2); - verifyCachedBlocksResult(cachedBlocks, namesystem, file1); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - /** - * Test BlockPlacementPolicyRaid.CachedFullPathNames - * Verify that the results obtained from cache is the same as - * the results obtained directly - */ - @Test - public void testCachedPathNames() throws IOException { - setupCluster(); - try { - String file1 = "/dir/file1"; - String file2 = "/dir/file2"; - DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); - DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); - // test full path cache - CachedFullPathNames cachedFullPathNames = - new CachedFullPathNames(namesystem); - FSInodeInfo inode1 = null; - FSInodeInfo inode2 = null; - NameNodeRaidTestUtil.readLock(namesystem.dir); - try { - inode1 = NameNodeRaidTestUtil.getNode(namesystem.dir, file1, true); - inode2 = NameNodeRaidTestUtil.getNode(namesystem.dir, file2, true); - } finally { - NameNodeRaidTestUtil.readUnLock(namesystem.dir); - } - verifyCachedFullPathNameResult(cachedFullPathNames, inode1); - verifyCachedFullPathNameResult(cachedFullPathNames, inode1); - verifyCachedFullPathNameResult(cachedFullPathNames, inode2); - verifyCachedFullPathNameResult(cachedFullPathNames, inode2); - try { - Thread.sleep(1200L); - } catch (InterruptedException e) { - } - verifyCachedFullPathNameResult(cachedFullPathNames, inode2); - verifyCachedFullPathNameResult(cachedFullPathNames, inode1); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - /** - * Test the result of getCompanionBlocks() on the unraided files - */ - @Test - public void testGetCompanionBLocks() throws IOException { - setupCluster(); - try { - String file1 = "/dir/file1"; - String file2 = "/raid/dir/file2"; - String file3 = "/raidrs/dir/file3"; - // Set the policy to default policy to place the block in the default way - setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault( - conf, namesystem, namesystem.clusterMap)); - DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); - DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); - DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L); - Collection companionBlocks; - - companionBlocks = getCompanionBlocks( - namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock()); - Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0); - - companionBlocks = getCompanionBlocks( - namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock()); - Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0); - - companionBlocks = getCompanionBlocks( - namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock()); - Assert.assertEquals(1, companionBlocks.size()); - - companionBlocks = getCompanionBlocks( - namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock()); - Assert.assertEquals(1, companionBlocks.size()); - - int rsParityLength = RaidNode.rsParityLength(conf); - companionBlocks = getCompanionBlocks( - namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock()); - Assert.assertEquals(rsParityLength, companionBlocks.size()); - - companionBlocks = getCompanionBlocks( - namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock()); - Assert.assertEquals(rsParityLength, companionBlocks.size()); - - companionBlocks = getCompanionBlocks( - namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock()); - Assert.assertEquals(2, companionBlocks.size()); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - static void setBlockPlacementPolicy( - FSNamesystem namesystem, BlockPlacementPolicy policy) { - namesystem.writeLock(); - try { - namesystem.blockManager.replicator = policy; - } finally { - namesystem.writeUnlock(); - } - } - - /** - * Test BlockPlacementPolicyRaid actually deletes the correct replica. - * Start 2 datanodes and create 1 source file and its parity file. - * 1) Start host1, create the parity file with replication 1 - * 2) Start host2, create the source file with replication 2 - * 3) Set repliation of source file to 1 - * Verify that the policy should delete the block with more companion blocks. - */ - @Test - public void testDeleteReplica() throws IOException { - setupCluster(); - try { - // Set the policy to default policy to place the block in the default way - setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault( - conf, namesystem, namesystem.clusterMap)); - DatanodeDescriptor datanode1 = - NameNodeRaidTestUtil.getDatanodeMap(namesystem).values().iterator().next(); - String source = "/dir/file"; - String parity = xorPrefix + source; - - final Path parityPath = new Path(parity); - DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L); - DFSTestUtil.waitReplication(fs, parityPath, (short)1); - - // start one more datanode - cluster.startDataNodes(conf, 1, true, null, rack2, host2, null); - DatanodeDescriptor datanode2 = null; - for (DatanodeDescriptor d : NameNodeRaidTestUtil.getDatanodeMap(namesystem).values()) { - if (!d.getName().equals(datanode1.getName())) { - datanode2 = d; - } - } - Assert.assertTrue(datanode2 != null); - cluster.waitActive(); - final Path sourcePath = new Path(source); - DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L); - DFSTestUtil.waitReplication(fs, sourcePath, (short)2); - - refreshPolicy(); - Assert.assertEquals(parity, - policy.getParityFile(source)); - Assert.assertEquals(source, - policy.getSourceFile(parity, xorPrefix)); - - List sourceBlocks = getBlocks(namesystem, source); - List parityBlocks = getBlocks(namesystem, parity); - Assert.assertEquals(5, sourceBlocks.size()); - Assert.assertEquals(3, parityBlocks.size()); - - // verify the result of getCompanionBlocks() - Collection companionBlocks; - companionBlocks = getCompanionBlocks( - namesystem, policy, sourceBlocks.get(0).getBlock()); - verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, - new int[]{0, 1}, new int[]{0}); - - companionBlocks = getCompanionBlocks( - namesystem, policy, sourceBlocks.get(1).getBlock()); - verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, - new int[]{0, 1}, new int[]{0}); - - companionBlocks = getCompanionBlocks( - namesystem, policy, sourceBlocks.get(2).getBlock()); - verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, - new int[]{2, 3}, new int[]{1}); - - companionBlocks = getCompanionBlocks( - namesystem, policy, sourceBlocks.get(3).getBlock()); - verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, - new int[]{2, 3}, new int[]{1}); - - companionBlocks = getCompanionBlocks( - namesystem, policy, sourceBlocks.get(4).getBlock()); - verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, - new int[]{4}, new int[]{2}); - - companionBlocks = getCompanionBlocks( - namesystem, policy, parityBlocks.get(0).getBlock()); - verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, - new int[]{0, 1}, new int[]{0}); - - companionBlocks = getCompanionBlocks( - namesystem, policy, parityBlocks.get(1).getBlock()); - verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, - new int[]{2, 3}, new int[]{1}); - - companionBlocks = getCompanionBlocks( - namesystem, policy, parityBlocks.get(2).getBlock()); - verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, - new int[]{4}, new int[]{2}); - - // Set the policy back to raid policy. We have to create a new object - // here to clear the block location cache - refreshPolicy(); - setBlockPlacementPolicy(namesystem, policy); - // verify policy deletes the correct blocks. companion blocks should be - // evenly distributed. - fs.setReplication(sourcePath, (short)1); - DFSTestUtil.waitReplication(fs, sourcePath, (short)1); - Map counters = new HashMap(); - refreshPolicy(); - for (int i = 0; i < parityBlocks.size(); i++) { - companionBlocks = getCompanionBlocks( - namesystem, policy, parityBlocks.get(i).getBlock()); - - counters = BlockPlacementPolicyRaid.countCompanionBlocks( - companionBlocks, false); - Assert.assertTrue(counters.get(datanode1.getName()) >= 1 && - counters.get(datanode1.getName()) <= 2); - Assert.assertTrue(counters.get(datanode1.getName()) + - counters.get(datanode2.getName()) == - companionBlocks.size()); - - counters = BlockPlacementPolicyRaid.countCompanionBlocks( - companionBlocks, true); - Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 && - counters.get(datanode1.getParent().getName()) <= 2); - Assert.assertTrue(counters.get(datanode1.getParent().getName()) + - counters.get(datanode2.getParent().getName()) == - companionBlocks.size()); - } - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - // create a new BlockPlacementPolicyRaid to clear the cache - private void refreshPolicy() { - policy = new BlockPlacementPolicyRaid(); - policy.initialize(conf, namesystem, namesystem.clusterMap); - } - - private void verifyCompanionBlocks(Collection companionBlocks, - List sourceBlocks, List parityBlocks, - int[] sourceBlockIndexes, int[] parityBlockIndexes) { - Set blockSet = new HashSet(); - for (LocatedBlock b : companionBlocks) { - blockSet.add(b.getBlock()); - } - Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length, - blockSet.size()); - for (int index : sourceBlockIndexes) { - Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock())); - } - for (int index : parityBlockIndexes) { - Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock())); - } - } - - private void verifyCachedFullPathNameResult( - CachedFullPathNames cachedFullPathNames, FSInodeInfo inode) - throws IOException { - String res1 = inode.getFullPathName(); - String res2 = cachedFullPathNames.get(inode); - LOG.info("Actual path name: " + res1); - LOG.info("Cached path name: " + res2); - Assert.assertEquals(cachedFullPathNames.get(inode), - inode.getFullPathName()); - } - - private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks, - FSNamesystem namesystem, String file) throws IOException{ - long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen(); - List res1 = NameNodeRaidUtil.getBlockLocations(namesystem, - file, 0L, len, false, false).getLocatedBlocks(); - List res2 = cachedBlocks.get(file); - for (int i = 0; i < res1.size(); i++) { - LOG.info("Actual block: " + res1.get(i).getBlock()); - LOG.info("Cached block: " + res2.get(i).getBlock()); - Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock()); - } - } - - private Collection getCompanionBlocks( - FSNamesystem namesystem, BlockPlacementPolicyRaid policy, - ExtendedBlock block) throws IOException { - INodeFile inode = namesystem.blockManager.blocksMap.getINode(block - .getLocalBlock()); - FileType type = policy.getFileType(inode.getFullPathName()); - return policy.getCompanionBlocks(inode.getFullPathName(), type, - block.getLocalBlock()); - } - - private List getBlocks(FSNamesystem namesystem, String file) - throws IOException { - long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen(); - return NameNodeRaidUtil.getBlockLocations(namesystem, - file, 0, len, false, false).getLocatedBlocks(); + public void testFoo() { } +// private Configuration conf = null; +// private MiniDFSCluster cluster = null; +// private FSNamesystem namesystem = null; +// private BlockPlacementPolicyRaid policy = null; +// private FileSystem fs = null; +// String[] rack1 = {"/rack1"}; +// String[] rack2 = {"/rack2"}; +// String[] host1 = {"host1.rack1.com"}; +// String[] host2 = {"host2.rack2.com"}; +// String xorPrefix = null; +// String raidTempPrefix = null; +// String raidrsTempPrefix = null; +// String raidrsHarTempPrefix = null; +// +// final static Log LOG = +// LogFactory.getLog(TestBlockPlacementPolicyRaid.class); +// +// protected void setupCluster() throws IOException { +// conf = new Configuration(); +// conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); +// conf.set("dfs.replication.pending.timeout.sec", "2"); +// conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L); +// conf.set("dfs.block.replicator.classname", +// "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid"); +// conf.set(RaidNode.STRIPE_LENGTH_KEY, "2"); +// conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3"); +// conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); +// // start the cluster with one datanode first +// cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1). +// format(true).racks(rack1).hosts(host1).build(); +// cluster.waitActive(); +// namesystem = cluster.getNameNode().getNamesystem(); +// Assert.assertTrue("BlockPlacementPolicy type is not correct.", +// namesystem.blockManager.replicator instanceof BlockPlacementPolicyRaid); +// policy = (BlockPlacementPolicyRaid) namesystem.blockManager.replicator; +// fs = cluster.getFileSystem(); +// xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath(); +// raidTempPrefix = RaidNode.xorTempPrefix(conf); +// raidrsTempPrefix = RaidNode.rsTempPrefix(conf); +// raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf); +// } +// +// /** +// * Test that the parity files will be placed at the good locations when we +// * create them. +// */ +// @Test +// public void testChooseTargetForRaidFile() throws IOException { +// setupCluster(); +// try { +// String src = "/dir/file"; +// String parity = raidrsTempPrefix + src; +// DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L); +// DFSTestUtil.waitReplication(fs, new Path(src), (short)1); +// refreshPolicy(); +// setBlockPlacementPolicy(namesystem, policy); +// // start 3 more datanodes +// String[] racks = {"/rack2", "/rack2", "/rack2", +// "/rack2", "/rack2", "/rack2"}; +// String[] hosts = +// {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com", +// "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"}; +// cluster.startDataNodes(conf, 6, true, null, racks, hosts, null); +// int numBlocks = 6; +// DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L); +// DFSTestUtil.waitReplication(fs, new Path(parity), (short)2); +// FileStatus srcStat = fs.getFileStatus(new Path(src)); +// BlockLocation[] srcLoc = +// fs.getFileBlockLocations(srcStat, 0, srcStat.getLen()); +// FileStatus parityStat = fs.getFileStatus(new Path(parity)); +// BlockLocation[] parityLoc = +// fs.getFileBlockLocations(parityStat, 0, parityStat.getLen()); +// int parityLen = RaidNode.rsParityLength(conf); +// for (int i = 0; i < numBlocks / parityLen; i++) { +// Set locations = new HashSet(); +// for (int j = 0; j < srcLoc.length; j++) { +// String [] names = srcLoc[j].getNames(); +// for (int k = 0; k < names.length; k++) { +// LOG.info("Source block location: " + names[k]); +// locations.add(names[k]); +// } +// } +// for (int j = 0 ; j < parityLen; j++) { +// String[] names = parityLoc[j + i * parityLen].getNames(); +// for (int k = 0; k < names.length; k++) { +// LOG.info("Parity block location: " + names[k]); +// Assert.assertTrue(locations.add(names[k])); +// } +// } +// } +// } finally { +// if (cluster != null) { +// cluster.shutdown(); +// } +// } +// } +// +// /** +// * Test that the har parity files will be placed at the good locations when we +// * create them. +// */ +// @Test +// public void testChooseTargetForHarRaidFile() throws IOException { +// setupCluster(); +// try { +// String[] racks = {"/rack2", "/rack2", "/rack2", +// "/rack2", "/rack2", "/rack2"}; +// String[] hosts = +// {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com", +// "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"}; +// cluster.startDataNodes(conf, 6, true, null, racks, hosts, null); +// String harParity = raidrsHarTempPrefix + "/dir/file"; +// int numBlocks = 11; +// DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L); +// DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1); +// FileStatus stat = fs.getFileStatus(new Path(harParity)); +// BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen()); +// int rsParityLength = RaidNode.rsParityLength(conf); +// for (int i = 0; i < numBlocks - rsParityLength; i++) { +// Set locations = new HashSet(); +// for (int j = 0; j < rsParityLength; j++) { +// for (int k = 0; k < loc[i + j].getNames().length; k++) { +// // verify that every adjacent 4 blocks are on differnt nodes +// String name = loc[i + j].getNames()[k]; +// LOG.info("Har Raid block location: " + name); +// Assert.assertTrue(locations.add(name)); +// } +// } +// } +// } finally { +// if (cluster != null) { +// cluster.shutdown(); +// } +// } +// } +// +// /** +// * Test BlockPlacementPolicyRaid.CachedLocatedBlocks +// * Verify that the results obtained from cache is the same as +// * the results obtained directly +// */ +// @Test +// public void testCachedBlocks() throws IOException { +// setupCluster(); +// try { +// String file1 = "/dir/file1"; +// String file2 = "/dir/file2"; +// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); +// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); +// // test blocks cache +// CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem); +// verifyCachedBlocksResult(cachedBlocks, namesystem, file1); +// verifyCachedBlocksResult(cachedBlocks, namesystem, file1); +// verifyCachedBlocksResult(cachedBlocks, namesystem, file2); +// verifyCachedBlocksResult(cachedBlocks, namesystem, file2); +// try { +// Thread.sleep(1200L); +// } catch (InterruptedException e) { +// } +// verifyCachedBlocksResult(cachedBlocks, namesystem, file2); +// verifyCachedBlocksResult(cachedBlocks, namesystem, file1); +// } finally { +// if (cluster != null) { +// cluster.shutdown(); +// } +// } +// } +// +// /** +// * Test BlockPlacementPolicyRaid.CachedFullPathNames +// * Verify that the results obtained from cache is the same as +// * the results obtained directly +// */ +// @Test +// public void testCachedPathNames() throws IOException { +// setupCluster(); +// try { +// String file1 = "/dir/file1"; +// String file2 = "/dir/file2"; +// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); +// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); +// // test full path cache +// CachedFullPathNames cachedFullPathNames = +// new CachedFullPathNames(namesystem); +// FSInodeInfo inode1 = null; +// FSInodeInfo inode2 = null; +// NameNodeRaidTestUtil.readLock(namesystem.dir); +// try { +// inode1 = NameNodeRaidTestUtil.getNode(namesystem.dir, file1, true); +// inode2 = NameNodeRaidTestUtil.getNode(namesystem.dir, file2, true); +// } finally { +// NameNodeRaidTestUtil.readUnLock(namesystem.dir); +// } +// verifyCachedFullPathNameResult(cachedFullPathNames, inode1); +// verifyCachedFullPathNameResult(cachedFullPathNames, inode1); +// verifyCachedFullPathNameResult(cachedFullPathNames, inode2); +// verifyCachedFullPathNameResult(cachedFullPathNames, inode2); +// try { +// Thread.sleep(1200L); +// } catch (InterruptedException e) { +// } +// verifyCachedFullPathNameResult(cachedFullPathNames, inode2); +// verifyCachedFullPathNameResult(cachedFullPathNames, inode1); +// } finally { +// if (cluster != null) { +// cluster.shutdown(); +// } +// } +// } +// /** +// * Test the result of getCompanionBlocks() on the unraided files +// */ +// @Test +// public void testGetCompanionBLocks() throws IOException { +// setupCluster(); +// try { +// String file1 = "/dir/file1"; +// String file2 = "/raid/dir/file2"; +// String file3 = "/raidrs/dir/file3"; +// // Set the policy to default policy to place the block in the default way +// setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault( +// conf, namesystem, namesystem.clusterMap)); +// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L); +// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L); +// DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L); +// Collection companionBlocks; +// +// companionBlocks = getCompanionBlocks( +// namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock()); +// Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0); +// +// companionBlocks = getCompanionBlocks( +// namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock()); +// Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0); +// +// companionBlocks = getCompanionBlocks( +// namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock()); +// Assert.assertEquals(1, companionBlocks.size()); +// +// companionBlocks = getCompanionBlocks( +// namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock()); +// Assert.assertEquals(1, companionBlocks.size()); +// +// int rsParityLength = RaidNode.rsParityLength(conf); +// companionBlocks = getCompanionBlocks( +// namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock()); +// Assert.assertEquals(rsParityLength, companionBlocks.size()); +// +// companionBlocks = getCompanionBlocks( +// namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock()); +// Assert.assertEquals(rsParityLength, companionBlocks.size()); +// +// companionBlocks = getCompanionBlocks( +// namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock()); +// Assert.assertEquals(2, companionBlocks.size()); +// } finally { +// if (cluster != null) { +// cluster.shutdown(); +// } +// } +// } +// +// static void setBlockPlacementPolicy( +// FSNamesystem namesystem, BlockPlacementPolicy policy) { +// namesystem.writeLock(); +// try { +// namesystem.blockManager.replicator = policy; +// } finally { +// namesystem.writeUnlock(); +// } +// } +// +// /** +// * Test BlockPlacementPolicyRaid actually deletes the correct replica. +// * Start 2 datanodes and create 1 source file and its parity file. +// * 1) Start host1, create the parity file with replication 1 +// * 2) Start host2, create the source file with replication 2 +// * 3) Set repliation of source file to 1 +// * Verify that the policy should delete the block with more companion blocks. +// */ +// @Test +// public void testDeleteReplica() throws IOException { +// setupCluster(); +// try { +// // Set the policy to default policy to place the block in the default way +// setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault( +// conf, namesystem, namesystem.clusterMap)); +// DatanodeDescriptor datanode1 = +// NameNodeRaidTestUtil.getDatanodeMap(namesystem).values().iterator().next(); +// String source = "/dir/file"; +// String parity = xorPrefix + source; +// +// final Path parityPath = new Path(parity); +// DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L); +// DFSTestUtil.waitReplication(fs, parityPath, (short)1); +// +// // start one more datanode +// cluster.startDataNodes(conf, 1, true, null, rack2, host2, null); +// DatanodeDescriptor datanode2 = null; +// for (DatanodeDescriptor d : NameNodeRaidTestUtil.getDatanodeMap(namesystem).values()) { +// if (!d.getName().equals(datanode1.getName())) { +// datanode2 = d; +// } +// } +// Assert.assertTrue(datanode2 != null); +// cluster.waitActive(); +// final Path sourcePath = new Path(source); +// DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L); +// DFSTestUtil.waitReplication(fs, sourcePath, (short)2); +// +// refreshPolicy(); +// Assert.assertEquals(parity, +// policy.getParityFile(source)); +// Assert.assertEquals(source, +// policy.getSourceFile(parity, xorPrefix)); +// +// List sourceBlocks = getBlocks(namesystem, source); +// List parityBlocks = getBlocks(namesystem, parity); +// Assert.assertEquals(5, sourceBlocks.size()); +// Assert.assertEquals(3, parityBlocks.size()); +// +// // verify the result of getCompanionBlocks() +// Collection companionBlocks; +// companionBlocks = getCompanionBlocks( +// namesystem, policy, sourceBlocks.get(0).getBlock()); +// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, +// new int[]{0, 1}, new int[]{0}); +// +// companionBlocks = getCompanionBlocks( +// namesystem, policy, sourceBlocks.get(1).getBlock()); +// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, +// new int[]{0, 1}, new int[]{0}); +// +// companionBlocks = getCompanionBlocks( +// namesystem, policy, sourceBlocks.get(2).getBlock()); +// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, +// new int[]{2, 3}, new int[]{1}); +// +// companionBlocks = getCompanionBlocks( +// namesystem, policy, sourceBlocks.get(3).getBlock()); +// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, +// new int[]{2, 3}, new int[]{1}); +// +// companionBlocks = getCompanionBlocks( +// namesystem, policy, sourceBlocks.get(4).getBlock()); +// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, +// new int[]{4}, new int[]{2}); +// +// companionBlocks = getCompanionBlocks( +// namesystem, policy, parityBlocks.get(0).getBlock()); +// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, +// new int[]{0, 1}, new int[]{0}); +// +// companionBlocks = getCompanionBlocks( +// namesystem, policy, parityBlocks.get(1).getBlock()); +// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, +// new int[]{2, 3}, new int[]{1}); +// +// companionBlocks = getCompanionBlocks( +// namesystem, policy, parityBlocks.get(2).getBlock()); +// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks, +// new int[]{4}, new int[]{2}); +// +// // Set the policy back to raid policy. We have to create a new object +// // here to clear the block location cache +// refreshPolicy(); +// setBlockPlacementPolicy(namesystem, policy); +// // verify policy deletes the correct blocks. companion blocks should be +// // evenly distributed. +// fs.setReplication(sourcePath, (short)1); +// DFSTestUtil.waitReplication(fs, sourcePath, (short)1); +// Map counters = new HashMap(); +// refreshPolicy(); +// for (int i = 0; i < parityBlocks.size(); i++) { +// companionBlocks = getCompanionBlocks( +// namesystem, policy, parityBlocks.get(i).getBlock()); +// +// counters = BlockPlacementPolicyRaid.countCompanionBlocks( +// companionBlocks, false); +// Assert.assertTrue(counters.get(datanode1.getName()) >= 1 && +// counters.get(datanode1.getName()) <= 2); +// Assert.assertTrue(counters.get(datanode1.getName()) + +// counters.get(datanode2.getName()) == +// companionBlocks.size()); +// +// counters = BlockPlacementPolicyRaid.countCompanionBlocks( +// companionBlocks, true); +// Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 && +// counters.get(datanode1.getParent().getName()) <= 2); +// Assert.assertTrue(counters.get(datanode1.getParent().getName()) + +// counters.get(datanode2.getParent().getName()) == +// companionBlocks.size()); +// } +// } finally { +// if (cluster != null) { +// cluster.shutdown(); +// } +// } +// } +// +// // create a new BlockPlacementPolicyRaid to clear the cache +// private void refreshPolicy() { +// policy = new BlockPlacementPolicyRaid(); +// policy.initialize(conf, namesystem, namesystem.clusterMap); +// } +// +// private void verifyCompanionBlocks(Collection companionBlocks, +// List sourceBlocks, List parityBlocks, +// int[] sourceBlockIndexes, int[] parityBlockIndexes) { +// Set blockSet = new HashSet(); +// for (LocatedBlock b : companionBlocks) { +// blockSet.add(b.getBlock()); +// } +// Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length, +// blockSet.size()); +// for (int index : sourceBlockIndexes) { +// Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock())); +// } +// for (int index : parityBlockIndexes) { +// Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock())); +// } +// } +// +// private void verifyCachedFullPathNameResult( +// CachedFullPathNames cachedFullPathNames, FSInodeInfo inode) +// throws IOException { +// String res1 = inode.getFullPathName(); +// String res2 = cachedFullPathNames.get(inode); +// LOG.info("Actual path name: " + res1); +// LOG.info("Cached path name: " + res2); +// Assert.assertEquals(cachedFullPathNames.get(inode), +// inode.getFullPathName()); +// } +// +// private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks, +// FSNamesystem namesystem, String file) throws IOException{ +// long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen(); +// List res1 = NameNodeRaidUtil.getBlockLocations(namesystem, +// file, 0L, len, false, false).getLocatedBlocks(); +// List res2 = cachedBlocks.get(file); +// for (int i = 0; i < res1.size(); i++) { +// LOG.info("Actual block: " + res1.get(i).getBlock()); +// LOG.info("Cached block: " + res2.get(i).getBlock()); +// Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock()); +// } +// } +// +// private Collection getCompanionBlocks( +// FSNamesystem namesystem, BlockPlacementPolicyRaid policy, +// ExtendedBlock block) throws IOException { +// INodeFile inode = namesystem.blockManager.blocksMap.getINode(block +// .getLocalBlock()); +// FileType type = policy.getFileType(inode.getFullPathName()); +// return policy.getCompanionBlocks(inode.getFullPathName(), type, +// block.getLocalBlock()); +// } +// +// private List getBlocks(FSNamesystem namesystem, String file) +// throws IOException { +// long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen(); +// return NameNodeRaidUtil.getBlockLocations(namesystem, +// file, 0, len, false, false).getLocatedBlocks(); +// } }