From 19b5aee3e42cd1d6c77a58ab2eea185b5afd60b2 Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Fri, 11 Nov 2016 01:17:50 -0800 Subject: [PATCH] HDFS-11068: [SPS]: Provide unique trackID to track the block movement sends to coordinator. Contributed by Rakesh R --- .../blockmanagement/DatanodeDescriptor.java | 31 ++++-- .../blockmanagement/DatanodeManager.java | 16 ++-- .../BlockStorageMovementInfosBatch.java | 61 ++++++++++++ .../namenode/StoragePolicySatisfier.java | 5 +- .../protocol/BlocksStorageMovementResult.java | 6 ++ .../namenode/TestStoragePolicySatisfier.java | 95 ++++++++++++++----- 6 files changed, 174 insertions(+), 40 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 951837e4e4..dbf0f7e50d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -41,9 +41,9 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; -import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; @@ -208,8 +208,11 @@ public Type getType() { private final LightWeightHashSet invalidateBlocks = new LightWeightHashSet<>(); - /** A queue of blocks for moving its storage placements by this datanode. */ - private final Queue> storageMovementBlocks = + /** + * A queue of blocks corresponding to trackID for moving its storage + * placements by this datanode. + */ + private final Queue storageMovementBlocks = new LinkedList<>(); /* Variables for maintaining number of blocks scheduled to be written to @@ -1075,18 +1078,30 @@ public boolean hasStorageType(StorageType type) { /** * Add the block infos which needs to move its storage locations. * + * @param trackID + * - unique identifier which will be used for tracking the given set + * of blocks movement completion. * @param storageMismatchedBlocks * - storage mismatched block infos */ - public void addBlocksToMoveStorage( + public void addBlocksToMoveStorage(long trackID, List storageMismatchedBlocks) { - storageMovementBlocks.offer(storageMismatchedBlocks); + synchronized (storageMovementBlocks) { + storageMovementBlocks.offer( + new BlockStorageMovementInfosBatch(trackID, storageMismatchedBlocks)); + } } /** - * @return block infos which needs to move its storage locations. + * @return block infos which needs to move its storage locations. This returns + * list of blocks under one trackId. */ - public List getBlocksToMoveStorages() { - return storageMovementBlocks.poll(); + public BlockStorageMovementInfosBatch getBlocksToMoveStorages() { + synchronized (storageMovementBlocks) { + // TODO: Presently returning the list of blocks under one trackId. + // Need to limit the list of items into small batches with in trackId + // itself if blocks are many(For example: a file contains many blocks). + return storageMovementBlocks.poll(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 65c5d6e565..ba49c1378f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; import org.apache.hadoop.hdfs.server.common.Util; +import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; @@ -47,7 +48,6 @@ import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; -import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.*; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; @@ -1740,16 +1740,14 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, } // check pending block storage movement tasks - List pendingBlockMovementList = nodeinfo + BlockStorageMovementInfosBatch blkStorageMovementInfosBatch = nodeinfo .getBlocksToMoveStorages(); - if (pendingBlockMovementList != null) { - // TODO: trackID is used to track the block movement sends to coordinator - // datanode. Need to implement tracking logic. Temporarily, using a - // constant value -1. - long trackID = -1; + + if (blkStorageMovementInfosBatch != null) { cmds.add(new BlockStorageMovementCommand( - DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, trackID, blockPoolId, - pendingBlockMovementList)); + DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, + blkStorageMovementInfosBatch.getTrackID(), blockPoolId, + blkStorageMovementInfosBatch.getBlockMovingInfo())); } if (!cmds.isEmpty()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java new file mode 100644 index 0000000000..a790c13368 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.util.List; + +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; + +/** + * This class represents a batch of blocks under one trackId which needs to move + * its storage locations to satisfy the storage policy. + */ +public class BlockStorageMovementInfosBatch { + private long trackID; + private List blockMovingInfos; + + /** + * Constructor to create the block storage movement infos batch. + * + * @param trackID + * - unique identifier which will be used for tracking the given set + * of blocks movement. + * @param blockMovingInfos + * - list of block to storage infos. + */ + public BlockStorageMovementInfosBatch(long trackID, + List blockMovingInfos) { + this.trackID = trackID; + this.blockMovingInfos = blockMovingInfos; + } + + public long getTrackID() { + return trackID; + } + + public List getBlockMovingInfo() { + return blockMovingInfos; + } + + @Override + public String toString() { + return new StringBuilder().append("BlockStorageMovementInfosBatch(\n ") + .append("TrackID: ").append(trackID).append(" BlockMovingInfos: ") + .append(blockMovingInfos).append(")").toString(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 6fa9302229..4967a89786 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -217,7 +217,10 @@ private void computeAndAssignStorageMismatchedBlocksToDNs( // chances, then we can just retry limited number of times and exit. return; } - coordinatorNode.addBlocksToMoveStorage(blockMovingInfos); + + // 'BlockCollectionId' is used as the tracking ID. All the blocks under this + // blockCollectionID will be added to this datanode. + coordinatorNode.addBlocksToMoveStorage(blockCollectionID, blockMovingInfos); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java index 1afba34054..713b83b31e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java @@ -61,4 +61,10 @@ public Status getStatus() { return status; } + @Override + public String toString() { + return new StringBuilder().append("BlocksStorageMovementResult(\n ") + .append("track id: ").append(trackId).append(" status: ") + .append(status).append(")").toString(); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 6f5c71770e..e84052f568 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; @@ -54,15 +56,15 @@ public class TestStoragePolicySatisfier { final private int storagesPerDatanode = 2; final private long capacity = 2 * 256 * 1024 * 1024; final private String file = "/testMoveWhenStoragePolicyNotSatisfying"; - private DistributedFileSystem distributedFS = null; + private DistributedFileSystem dfs = null; @Before public void setUp() throws IOException { config.setLong("dfs.block.size", 1024); hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes, storagesPerDatanode, capacity); - distributedFS = hdfsCluster.getFileSystem(); - writeContent(distributedFS, file); + dfs = hdfsCluster.getFileSystem(); + writeContent(file); } @Test(timeout = 300000) @@ -71,7 +73,7 @@ public void testWhenStoragePolicySetToCOLD() try { // Change policy to ALL_SSD - distributedFS.setStoragePolicy(new Path(file), "COLD"); + dfs.setStoragePolicy(new Path(file), "COLD"); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -86,8 +88,7 @@ public void testWhenStoragePolicySetToCOLD() hdfsCluster.triggerHeartbeats(); // Wait till namenode notified about the block location details - waitExpectedStorageType(file, StorageType.ARCHIVE, distributedFS, 3, - 30000); + waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000); } finally { hdfsCluster.shutdown(); } @@ -98,7 +99,7 @@ public void testWhenStoragePolicySetToALLSSD() throws Exception { try { // Change policy to ALL_SSD - distributedFS.setStoragePolicy(new Path(file), "ALL_SSD"); + dfs.setStoragePolicy(new Path(file), "ALL_SSD"); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -115,7 +116,7 @@ public void testWhenStoragePolicySetToALLSSD() hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas - waitExpectedStorageType(file, StorageType.SSD, distributedFS, 3, 30000); + waitExpectedStorageType(file, StorageType.SSD, 3, 30000); } finally { hdfsCluster.shutdown(); } @@ -126,7 +127,7 @@ public void testWhenStoragePolicySetToONESSD() throws Exception { try { // Change policy to ONE_SSD - distributedFS.setStoragePolicy(new Path(file), "ONE_SSD"); + dfs.setStoragePolicy(new Path(file), "ONE_SSD"); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -141,8 +142,8 @@ public void testWhenStoragePolicySetToONESSD() hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas - waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000); - waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000); + waitExpectedStorageType(file, StorageType.SSD, 1, 30000); + waitExpectedStorageType(file, StorageType.DISK, 2, 30000); } finally { hdfsCluster.shutdown(); } @@ -156,7 +157,7 @@ public void testWhenStoragePolicySetToONESSD() public void testPerTrackIdBlocksStorageMovementResults() throws Exception { try { // Change policy to ONE_SSD - distributedFS.setStoragePolicy(new Path(file), "ONE_SSD"); + dfs.setStoragePolicy(new Path(file), "ONE_SSD"); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -171,8 +172,8 @@ public void testPerTrackIdBlocksStorageMovementResults() throws Exception { hdfsCluster.triggerHeartbeats(); // Wait till the block is moved to SSD areas - waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000); - waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000); + waitExpectedStorageType(file, StorageType.SSD, 1, 30000); + waitExpectedStorageType(file, StorageType.DISK, 2, 30000); waitForBlocksMovementResult(1, 30000); } finally { @@ -180,7 +181,58 @@ public void testPerTrackIdBlocksStorageMovementResults() throws Exception { } } - private void waitForBlocksMovementResult(int expectedResultsCount, + /** + * Tests to verify that multiple files are giving to satisfy storage policy + * and should work well altogether. + */ + @Test(timeout = 300000) + public void testMultipleFilesForSatisfyStoragePolicy() throws Exception { + List files = new ArrayList<>(); + files.add(file); + + // Creates 4 more files. Send all of them for satisfying the storage policy + // together. + for (int i = 0; i < 4; i++) { + String file1 = "/testMoveWhenStoragePolicyNotSatisfying_" + i; + files.add(file1); + writeContent(file1); + } + + try { + FSNamesystem namesystem = hdfsCluster.getNamesystem(); + List blockCollectionIds = new ArrayList<>(); + // Change policy to ONE_SSD + for (String fileName : files) { + dfs.setStoragePolicy(new Path(fileName), "ONE_SSD"); + INode inode = namesystem.getFSDirectory().getINode(fileName); + blockCollectionIds.add(inode.getId()); + } + + StorageType[][] newtypes = + new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; + + // Making sure SDD based nodes added to cluster. Adding SSD based + // datanodes. + startAdditionalDNs(config, 1, numOfDatanodes, newtypes, + storagesPerDatanode, capacity, hdfsCluster); + for (long inodeId : blockCollectionIds) { + namesystem.getBlockManager().satisfyStoragePolicy(inodeId); + } + hdfsCluster.triggerHeartbeats(); + + for (String fileName : files) { + // Wait till the block is moved to SSD areas + waitExpectedStorageType(fileName, StorageType.SSD, 1, 30000); + waitExpectedStorageType(fileName, StorageType.DISK, 2, 30000); + } + + waitForBlocksMovementResult(blockCollectionIds.size(), 30000); + } finally { + hdfsCluster.shutdown(); + } + } + + private void waitForBlocksMovementResult(long expectedBlkMovResultsCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); @@ -188,16 +240,15 @@ private void waitForBlocksMovementResult(int expectedResultsCount, @Override public Boolean get() { LOG.info("expectedResultsCount={} actualResultsCount={}", - expectedResultsCount, + expectedBlkMovResultsCount, sps.getAttemptedItemsMonitor().resultsCount()); - return expectedResultsCount == sps.getAttemptedItemsMonitor() - .resultsCount(); + return sps.getAttemptedItemsMonitor() + .resultsCount() == expectedBlkMovResultsCount; } }, 100, timeout); } - private void writeContent(final DistributedFileSystem dfs, - final String fileName) throws IOException { + private void writeContent(final String fileName) throws IOException { // write to DISK final FSDataOutputStream out = dfs.create(new Path(fileName)); for (int i = 0; i < 1000; i++) { @@ -243,8 +294,8 @@ private MiniDFSCluster startCluster(final Configuration conf, // Check whether the Block movement has been successfully completed to satisfy // the storage policy for the given file. private void waitExpectedStorageType(final String fileName, - final StorageType expectedStorageType, final DistributedFileSystem dfs, - int expectedStorageCount, int timeout) throws Exception { + final StorageType expectedStorageType, int expectedStorageCount, + int timeout) throws Exception { GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() {