HDFS-11068: [SPS]: Provide unique trackID to track the block movement sends to coordinator. Contributed by Rakesh R

This commit is contained in:
Uma Maheswara Rao G 2016-11-11 01:17:50 -08:00 committed by Uma Maheswara Rao Gangumalla
parent 047526b4c2
commit 19b5aee3e4
6 changed files with 174 additions and 40 deletions

View File

@ -41,9 +41,9 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
@ -208,8 +208,11 @@ public Type getType() {
private final LightWeightHashSet<Block> invalidateBlocks = private final LightWeightHashSet<Block> invalidateBlocks =
new LightWeightHashSet<>(); new LightWeightHashSet<>();
/** A queue of blocks for moving its storage placements by this datanode. */ /**
private final Queue<List<BlockMovingInfo>> storageMovementBlocks = * A queue of blocks corresponding to trackID for moving its storage
* placements by this datanode.
*/
private final Queue<BlockStorageMovementInfosBatch> storageMovementBlocks =
new LinkedList<>(); new LinkedList<>();
/* Variables for maintaining number of blocks scheduled to be written to /* Variables for maintaining number of blocks scheduled to be written to
@ -1075,18 +1078,30 @@ public boolean hasStorageType(StorageType type) {
/** /**
* Add the block infos which needs to move its storage locations. * Add the block infos which needs to move its storage locations.
* *
* @param trackID
* - unique identifier which will be used for tracking the given set
* of blocks movement completion.
* @param storageMismatchedBlocks * @param storageMismatchedBlocks
* - storage mismatched block infos * - storage mismatched block infos
*/ */
public void addBlocksToMoveStorage( public void addBlocksToMoveStorage(long trackID,
List<BlockMovingInfo> storageMismatchedBlocks) { List<BlockMovingInfo> storageMismatchedBlocks) {
storageMovementBlocks.offer(storageMismatchedBlocks); synchronized (storageMovementBlocks) {
storageMovementBlocks.offer(
new BlockStorageMovementInfosBatch(trackID, storageMismatchedBlocks));
}
} }
/** /**
* @return block infos which needs to move its storage locations. * @return block infos which needs to move its storage locations. This returns
* list of blocks under one trackId.
*/ */
public List<BlockMovingInfo> getBlocksToMoveStorages() { public BlockStorageMovementInfosBatch getBlocksToMoveStorages() {
return storageMovementBlocks.poll(); synchronized (storageMovementBlocks) {
// TODO: Presently returning the list of blocks under one trackId.
// Need to limit the list of items into small batches with in trackId
// itself if blocks are many(For example: a file contains many blocks).
return storageMovementBlocks.poll();
}
} }
} }

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@ -47,7 +48,6 @@
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*; import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
@ -1740,16 +1740,14 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
} }
// check pending block storage movement tasks // check pending block storage movement tasks
List<BlockMovingInfo> pendingBlockMovementList = nodeinfo BlockStorageMovementInfosBatch blkStorageMovementInfosBatch = nodeinfo
.getBlocksToMoveStorages(); .getBlocksToMoveStorages();
if (pendingBlockMovementList != null) {
// TODO: trackID is used to track the block movement sends to coordinator if (blkStorageMovementInfosBatch != null) {
// datanode. Need to implement tracking logic. Temporarily, using a
// constant value -1.
long trackID = -1;
cmds.add(new BlockStorageMovementCommand( cmds.add(new BlockStorageMovementCommand(
DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, trackID, blockPoolId, DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
pendingBlockMovementList)); blkStorageMovementInfosBatch.getTrackID(), blockPoolId,
blkStorageMovementInfosBatch.getBlockMovingInfo()));
} }
if (!cmds.isEmpty()) { if (!cmds.isEmpty()) {

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.util.List;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
/**
* This class represents a batch of blocks under one trackId which needs to move
* its storage locations to satisfy the storage policy.
*/
public class BlockStorageMovementInfosBatch {
private long trackID;
private List<BlockMovingInfo> blockMovingInfos;
/**
* Constructor to create the block storage movement infos batch.
*
* @param trackID
* - unique identifier which will be used for tracking the given set
* of blocks movement.
* @param blockMovingInfos
* - list of block to storage infos.
*/
public BlockStorageMovementInfosBatch(long trackID,
List<BlockMovingInfo> blockMovingInfos) {
this.trackID = trackID;
this.blockMovingInfos = blockMovingInfos;
}
public long getTrackID() {
return trackID;
}
public List<BlockMovingInfo> getBlockMovingInfo() {
return blockMovingInfos;
}
@Override
public String toString() {
return new StringBuilder().append("BlockStorageMovementInfosBatch(\n ")
.append("TrackID: ").append(trackID).append(" BlockMovingInfos: ")
.append(blockMovingInfos).append(")").toString();
}
}

View File

@ -217,7 +217,10 @@ private void computeAndAssignStorageMismatchedBlocksToDNs(
// chances, then we can just retry limited number of times and exit. // chances, then we can just retry limited number of times and exit.
return; return;
} }
coordinatorNode.addBlocksToMoveStorage(blockMovingInfos);
// 'BlockCollectionId' is used as the tracking ID. All the blocks under this
// blockCollectionID will be added to this datanode.
coordinatorNode.addBlocksToMoveStorage(blockCollectionID, blockMovingInfos);
} }
/** /**

View File

@ -61,4 +61,10 @@ public Status getStatus() {
return status; return status;
} }
@Override
public String toString() {
return new StringBuilder().append("BlocksStorageMovementResult(\n ")
.append("track id: ").append(trackId).append(" status: ")
.append(status).append(")").toString();
}
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -54,15 +56,15 @@ public class TestStoragePolicySatisfier {
final private int storagesPerDatanode = 2; final private int storagesPerDatanode = 2;
final private long capacity = 2 * 256 * 1024 * 1024; final private long capacity = 2 * 256 * 1024 * 1024;
final private String file = "/testMoveWhenStoragePolicyNotSatisfying"; final private String file = "/testMoveWhenStoragePolicyNotSatisfying";
private DistributedFileSystem distributedFS = null; private DistributedFileSystem dfs = null;
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
config.setLong("dfs.block.size", 1024); config.setLong("dfs.block.size", 1024);
hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes, hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes,
storagesPerDatanode, capacity); storagesPerDatanode, capacity);
distributedFS = hdfsCluster.getFileSystem(); dfs = hdfsCluster.getFileSystem();
writeContent(distributedFS, file); writeContent(file);
} }
@Test(timeout = 300000) @Test(timeout = 300000)
@ -71,7 +73,7 @@ public void testWhenStoragePolicySetToCOLD()
try { try {
// Change policy to ALL_SSD // Change policy to ALL_SSD
distributedFS.setStoragePolicy(new Path(file), "COLD"); dfs.setStoragePolicy(new Path(file), "COLD");
FSNamesystem namesystem = hdfsCluster.getNamesystem(); FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file); INode inode = namesystem.getFSDirectory().getINode(file);
@ -86,8 +88,7 @@ public void testWhenStoragePolicySetToCOLD()
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
// Wait till namenode notified about the block location details // Wait till namenode notified about the block location details
waitExpectedStorageType(file, StorageType.ARCHIVE, distributedFS, 3, waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000);
30000);
} finally { } finally {
hdfsCluster.shutdown(); hdfsCluster.shutdown();
} }
@ -98,7 +99,7 @@ public void testWhenStoragePolicySetToALLSSD()
throws Exception { throws Exception {
try { try {
// Change policy to ALL_SSD // Change policy to ALL_SSD
distributedFS.setStoragePolicy(new Path(file), "ALL_SSD"); dfs.setStoragePolicy(new Path(file), "ALL_SSD");
FSNamesystem namesystem = hdfsCluster.getNamesystem(); FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file); INode inode = namesystem.getFSDirectory().getINode(file);
@ -115,7 +116,7 @@ public void testWhenStoragePolicySetToALLSSD()
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD // Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas // areas
waitExpectedStorageType(file, StorageType.SSD, distributedFS, 3, 30000); waitExpectedStorageType(file, StorageType.SSD, 3, 30000);
} finally { } finally {
hdfsCluster.shutdown(); hdfsCluster.shutdown();
} }
@ -126,7 +127,7 @@ public void testWhenStoragePolicySetToONESSD()
throws Exception { throws Exception {
try { try {
// Change policy to ONE_SSD // Change policy to ONE_SSD
distributedFS.setStoragePolicy(new Path(file), "ONE_SSD"); dfs.setStoragePolicy(new Path(file), "ONE_SSD");
FSNamesystem namesystem = hdfsCluster.getNamesystem(); FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file); INode inode = namesystem.getFSDirectory().getINode(file);
@ -141,8 +142,8 @@ public void testWhenStoragePolicySetToONESSD()
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD // Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas // areas
waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000); waitExpectedStorageType(file, StorageType.SSD, 1, 30000);
waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000); waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
} finally { } finally {
hdfsCluster.shutdown(); hdfsCluster.shutdown();
} }
@ -156,7 +157,7 @@ public void testWhenStoragePolicySetToONESSD()
public void testPerTrackIdBlocksStorageMovementResults() throws Exception { public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
try { try {
// Change policy to ONE_SSD // Change policy to ONE_SSD
distributedFS.setStoragePolicy(new Path(file), "ONE_SSD"); dfs.setStoragePolicy(new Path(file), "ONE_SSD");
FSNamesystem namesystem = hdfsCluster.getNamesystem(); FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file); INode inode = namesystem.getFSDirectory().getINode(file);
@ -171,8 +172,8 @@ public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
// Wait till the block is moved to SSD areas // Wait till the block is moved to SSD areas
waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000); waitExpectedStorageType(file, StorageType.SSD, 1, 30000);
waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000); waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
waitForBlocksMovementResult(1, 30000); waitForBlocksMovementResult(1, 30000);
} finally { } finally {
@ -180,7 +181,58 @@ public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
} }
} }
private void waitForBlocksMovementResult(int expectedResultsCount, /**
* Tests to verify that multiple files are giving to satisfy storage policy
* and should work well altogether.
*/
@Test(timeout = 300000)
public void testMultipleFilesForSatisfyStoragePolicy() throws Exception {
List<String> files = new ArrayList<>();
files.add(file);
// Creates 4 more files. Send all of them for satisfying the storage policy
// together.
for (int i = 0; i < 4; i++) {
String file1 = "/testMoveWhenStoragePolicyNotSatisfying_" + i;
files.add(file1);
writeContent(file1);
}
try {
FSNamesystem namesystem = hdfsCluster.getNamesystem();
List<Long> blockCollectionIds = new ArrayList<>();
// Change policy to ONE_SSD
for (String fileName : files) {
dfs.setStoragePolicy(new Path(fileName), "ONE_SSD");
INode inode = namesystem.getFSDirectory().getINode(fileName);
blockCollectionIds.add(inode.getId());
}
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
// Making sure SDD based nodes added to cluster. Adding SSD based
// datanodes.
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
for (long inodeId : blockCollectionIds) {
namesystem.getBlockManager().satisfyStoragePolicy(inodeId);
}
hdfsCluster.triggerHeartbeats();
for (String fileName : files) {
// Wait till the block is moved to SSD areas
waitExpectedStorageType(fileName, StorageType.SSD, 1, 30000);
waitExpectedStorageType(fileName, StorageType.DISK, 2, 30000);
}
waitForBlocksMovementResult(blockCollectionIds.size(), 30000);
} finally {
hdfsCluster.shutdown();
}
}
private void waitForBlocksMovementResult(long expectedBlkMovResultsCount,
int timeout) throws TimeoutException, InterruptedException { int timeout) throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
@ -188,16 +240,15 @@ private void waitForBlocksMovementResult(int expectedResultsCount,
@Override @Override
public Boolean get() { public Boolean get() {
LOG.info("expectedResultsCount={} actualResultsCount={}", LOG.info("expectedResultsCount={} actualResultsCount={}",
expectedResultsCount, expectedBlkMovResultsCount,
sps.getAttemptedItemsMonitor().resultsCount()); sps.getAttemptedItemsMonitor().resultsCount());
return expectedResultsCount == sps.getAttemptedItemsMonitor() return sps.getAttemptedItemsMonitor()
.resultsCount(); .resultsCount() == expectedBlkMovResultsCount;
} }
}, 100, timeout); }, 100, timeout);
} }
private void writeContent(final DistributedFileSystem dfs, private void writeContent(final String fileName) throws IOException {
final String fileName) throws IOException {
// write to DISK // write to DISK
final FSDataOutputStream out = dfs.create(new Path(fileName)); final FSDataOutputStream out = dfs.create(new Path(fileName));
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
@ -243,8 +294,8 @@ private MiniDFSCluster startCluster(final Configuration conf,
// Check whether the Block movement has been successfully completed to satisfy // Check whether the Block movement has been successfully completed to satisfy
// the storage policy for the given file. // the storage policy for the given file.
private void waitExpectedStorageType(final String fileName, private void waitExpectedStorageType(final String fileName,
final StorageType expectedStorageType, final DistributedFileSystem dfs, final StorageType expectedStorageType, int expectedStorageCount,
int expectedStorageCount, int timeout) throws Exception { int timeout) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {