HDFS-10801. [SPS]: Protocol buffer changes for sending storage movement commands from NN to DN. Contributed by Rakesh R

This commit is contained in:
Rakesh Radhakrishnan 2016-10-11 11:44:06 +05:30 committed by Uma Maheswara Rao Gangumalla
parent 1438da4944
commit e2a15d18bb
10 changed files with 276 additions and 59 deletions

View File

@ -54,6 +54,8 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@ -98,6 +100,8 @@
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@ -469,6 +473,8 @@ public static DatanodeCommand convert(DatanodeCommandProto proto) {
return PBHelper.convert(proto.getBlkIdCmd());
case BlockECReconstructionCommand:
return PBHelper.convert(proto.getBlkECReconstructionCmd());
case BlockStorageMovementCommand:
return PBHelper.convert(proto.getBlkStorageMovementCmd());
default:
return null;
}
@ -603,6 +609,11 @@ public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
.setBlkECReconstructionCmd(
convert((BlockECReconstructionCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
builder.setCmdType(DatanodeCommandProto.Type.BlockStorageMovementCommand)
.setBlkStorageMovementCmd(
convert((BlockStorageMovementCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_UNKNOWN: //Not expected
default:
builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);
@ -1124,4 +1135,83 @@ public static KeyValueProto convert(FileRegion fileRegion) {
return new FileRegion(block, providedStorageLocation);
}
private static BlockStorageMovementCommandProto convert(
BlockStorageMovementCommand blkStorageMovementCmd) {
BlockStorageMovementCommandProto.Builder builder =
BlockStorageMovementCommandProto.newBuilder();
builder.setTrackID(blkStorageMovementCmd.getTrackID());
builder.setBlockPoolId(blkStorageMovementCmd.getBlockPoolId());
Collection<BlockMovingInfo> blockMovingInfos = blkStorageMovementCmd
.getBlockMovingTasks();
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
builder.addBlockStorageMovement(
convertBlockMovingInfo(blkMovingInfo));
}
return builder.build();
}
private static BlockStorageMovementProto convertBlockMovingInfo(
BlockMovingInfo blkMovingInfo) {
BlockStorageMovementProto.Builder builder = BlockStorageMovementProto
.newBuilder();
builder.setBlock(PBHelperClient.convert(blkMovingInfo.getBlock()));
DatanodeInfo[] sourceDnInfos = blkMovingInfo.getSources();
builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
DatanodeInfo[] targetDnInfos = blkMovingInfo.getTargets();
builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
StorageType[] sourceStorageTypes = blkMovingInfo.getSourceStorageTypes();
builder.setSourceStorageTypes(convertStorageTypesProto(sourceStorageTypes));
StorageType[] targetStorageTypes = blkMovingInfo.getTargetStorageTypes();
builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
return builder.build();
}
private static DatanodeCommand convert(
BlockStorageMovementCommandProto blkStorageMovementCmdProto) {
Collection<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
List<BlockStorageMovementProto> blkSPSatisfyList =
blkStorageMovementCmdProto.getBlockStorageMovementList();
for (BlockStorageMovementProto blkSPSatisfy : blkSPSatisfyList) {
blockMovingInfos.add(convertBlockMovingInfo(blkSPSatisfy));
}
return new BlockStorageMovementCommand(
DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
blkStorageMovementCmdProto.getTrackID(),
blkStorageMovementCmdProto.getBlockPoolId(), blockMovingInfos);
}
private static BlockMovingInfo convertBlockMovingInfo(
BlockStorageMovementProto blockStoragePolicySatisfyProto) {
BlockProto blockProto = blockStoragePolicySatisfyProto.getBlock();
Block block = PBHelperClient.convert(blockProto);
DatanodeInfosProto sourceDnInfosProto = blockStoragePolicySatisfyProto
.getSourceDnInfos();
DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto);
DatanodeInfosProto targetDnInfosProto = blockStoragePolicySatisfyProto
.getTargetDnInfos();
DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto);
StorageTypesProto srcStorageTypesProto = blockStoragePolicySatisfyProto
.getSourceStorageTypes();
StorageType[] srcStorageTypes = PBHelperClient.convertStorageTypes(
srcStorageTypesProto.getStorageTypesList(),
srcStorageTypesProto.getStorageTypesList().size());
StorageTypesProto targetStorageTypesProto = blockStoragePolicySatisfyProto
.getTargetStorageTypes();
StorageType[] targetStorageTypes = PBHelperClient.convertStorageTypes(
targetStorageTypesProto.getStorageTypesList(),
targetStorageTypesProto.getStorageTypesList().size());
return new BlockMovingInfo(block, sourceDnInfos, targetDnInfos,
srcStorageTypes, targetStorageTypes);
}
}

View File

@ -1089,19 +1089,4 @@ public void addBlocksToMoveStorage(
public List<BlockMovingInfo> getBlocksToMoveStorages() {
return storageMovementBlocks.poll();
}
// TODO: we will remove this method once DN side handling integrated. We can
// convert the test to check real block movements instead of this ds.
@VisibleForTesting
public List<BlockMovingInfo> getStorageMovementPendingItems() {
List<BlockMovingInfo> flatList = new ArrayList<>();
Iterator<List<BlockMovingInfo>> iterator = storageMovementBlocks
.iterator();
while (iterator.hasNext()) {
List<BlockMovingInfo> next = iterator.next();
flatList.addAll(next);
}
return flatList;
}
}

View File

@ -1739,6 +1739,19 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
}
}
// check pending block storage movement tasks
List<BlockMovingInfo> pendingBlockMovementList = nodeinfo
.getBlocksToMoveStorages();
if (pendingBlockMovementList != null) {
// TODO: trackID is used to track the block movement sends to coordinator
// datanode. Need to implement tracking logic. Temporarily, using a
// constant value -1.
long trackID = -1;
cmds.add(new BlockStorageMovementCommand(
DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, trackID, blockPoolId,
pendingBlockMovementList));
}
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}

View File

@ -795,6 +795,13 @@ assert getBlockPoolId().equals(bp) :
((BlockECReconstructionCommand) cmd).getECTasks();
dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
break;
case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
LOG.info("DatanodeCommand action: DNA_BLOCK_STORAGE_MOVEMENT");
BlockStorageMovementCommand blkSPSCmd = (BlockStorageMovementCommand) cmd;
dn.getStoragePolicySatisfyWorker().processBlockMovingTasks(
blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(),
blkSPSCmd.getBlockMovingTasks());
break;
default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
}
@ -825,6 +832,7 @@ private boolean processCommandFromStandby(DatanodeCommand cmd,
case DatanodeProtocol.DNA_CACHE:
case DatanodeProtocol.DNA_UNCACHE:
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
break;
default:

View File

@ -386,6 +386,7 @@ public static InetSocketAddress createSocketAddr(String target) {
private String dnUserName = null;
private BlockRecoveryWorker blockRecoveryWorker;
private ErasureCodingWorker ecWorker;
private StoragePolicySatisfyWorker storagePolicySatisfyWorker;
private final Tracer tracer;
private final TracerConfigurationManager tracerConfigurationManager;
private static final int NUM_CORES = Runtime.getRuntime()
@ -1425,6 +1426,8 @@ void startDataNode(List<StorageLocation> dataDirectories,
ecWorker = new ErasureCodingWorker(getConf(), this);
blockRecoveryWorker = new BlockRecoveryWorker(this);
storagePolicySatisfyWorker =
new StoragePolicySatisfyWorker(getConf(), this);
blockPoolManager = new BlockPoolManager(this);
blockPoolManager.refreshNamenodes(getConf());
@ -3617,3 +3620,7 @@ private DiskBalancer getDiskBalancer() throws IOException {
return this.diskBalancer;
}
}
StoragePolicySatisfyWorker getStoragePolicySatisfyWorker() {
return storagePolicySatisfyWorker;
}}

View File

@ -28,6 +28,7 @@
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Callable;
@ -126,8 +127,25 @@ public void rejectedExecution(Runnable runnable,
return moverThreadPool;
}
/**
* Handles the given set of block movement tasks. This will iterate over the
* block movement list and submit each block movement task asynchronously in a
* separate thread. Each task will move the block replica to the target node &
* wait for the completion.
*
* TODO: Presently this function is a blocking call, this has to be refined by
* moving the tracking logic to another tracker thread. HDFS-10884 jira
* addresses the same.
*
* @param trackID
* unique tracking identifier
* @param blockPoolID
* block pool ID
* @param blockMovingInfos
* list of blocks to be moved
*/
public void processBlockMovingTasks(long trackID, String blockPoolID,
List<BlockMovingInfo> blockMovingInfos) {
Collection<BlockMovingInfo> blockMovingInfos) {
Future<Void> moveCallable = null;
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
assert blkMovingInfo
@ -143,8 +161,6 @@ public void processBlockMovingTasks(long trackID, String blockPoolID,
}
}
// TODO: Presently this function act as a blocking call, this has to be
// refined by moving the tracking logic to another tracker thread.
for (int i = 0; i < moverTaskFutures.size(); i++) {
try {
moveCallable = moverExecutorCompletionService.take();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.protocol;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
@ -33,12 +34,60 @@
* {@link org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker}
* service. After the block movement this DataNode sends response back to the
* NameNode about the movement status.
*
* The coordinator datanode will use 'trackId' identifier to coordinate the block
* movement of the given set of blocks. TrackId is a unique identifier that
* represents a group of blocks. Namenode will generate this unique value and
* send it to the coordinator datanode along with the
* BlockStorageMovementCommand. Datanode will monitor the completion of the
* block movements that grouped under this trackId and notifies Namenode about
* the completion status.
*/
public class BlockStorageMovementCommand extends DatanodeCommand {
// TODO: constructor needs to be refined based on the block movement data
// structure.
BlockStorageMovementCommand(int action) {
private final long trackID;
private final String blockPoolId;
private final Collection<BlockMovingInfo> blockMovingTasks;
/**
* Block storage movement command constructor.
*
* @param action
* protocol specific action
* @param trackID
* unique identifier to monitor the given set of block movements
* @param blockPoolId
* block pool ID
* @param blockMovingInfos
* block to storage info that will be used for movement
*/
public BlockStorageMovementCommand(int action, long trackID,
String blockPoolId, Collection<BlockMovingInfo> blockMovingInfos) {
super(action);
this.trackID = trackID;
this.blockPoolId = blockPoolId;
this.blockMovingTasks = blockMovingInfos;
}
/**
* Returns trackID, which will be used to monitor the block movement assigned
* to this coordinator datanode.
*/
public long getTrackID() {
return trackID;
}
/**
* Returns block pool ID.
*/
public String getBlockPoolId() {
return blockPoolId;
}
/**
* Returns the list of blocks to be moved.
*/
public Collection<BlockMovingInfo> getBlockMovingTasks() {
return blockMovingTasks;
}
/**
@ -47,10 +96,24 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
public static class BlockMovingInfo {
private Block blk;
private DatanodeInfo[] sourceNodes;
private StorageType[] sourceStorageTypes;
private DatanodeInfo[] targetNodes;
private StorageType[] sourceStorageTypes;
private StorageType[] targetStorageTypes;
/**
* Block to storage info constructor.
*
* @param block
* block
* @param sourceDnInfos
* node that can be the sources of a block move
* @param targetDnInfos
* target datanode info
* @param srcStorageTypes
* type of source storage media
* @param targetStorageTypes
* type of destin storage media
*/
public BlockMovingInfo(Block block,
DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos,
StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) {

View File

@ -79,6 +79,7 @@ public interface DatanodeProtocol {
final static int DNA_CACHE = 9; // cache blocks
final static int DNA_UNCACHE = 10; // uncache blocks
final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command
final static int DNA_BLOCK_STORAGE_MOVEMENT = 12; // block storage movement command
/**
* Register Datanode.

View File

@ -60,6 +60,7 @@ message DatanodeCommandProto {
NullDatanodeCommand = 7;
BlockIdCommand = 8;
BlockECReconstructionCommand = 9;
BlockStorageMovementCommand = 10;
}
required Type cmdType = 1; // Type of the command
@ -74,6 +75,7 @@ message DatanodeCommandProto {
optional RegisterCommandProto registerCmd = 7;
optional BlockIdCommandProto blkIdCmd = 8;
optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9;
optional BlockStorageMovementCommandProto blkStorageMovementCmd = 10;
}
/**
@ -154,6 +156,26 @@ message BlockECReconstructionCommandProto {
repeated BlockECReconstructionInfoProto blockECReconstructioninfo = 1;
}
/**
* Block storage movement command
*/
message BlockStorageMovementCommandProto {
required uint64 trackID = 1;
required string blockPoolId = 2;
repeated BlockStorageMovementProto blockStorageMovement = 3;
}
/**
* Block storage movement information
*/
message BlockStorageMovementProto {
required BlockProto block = 1;
required DatanodeInfosProto sourceDnInfos = 2;
required DatanodeInfosProto targetDnInfos = 3;
required StorageTypesProto sourceStorageTypes = 4;
required StorageTypesProto targetStorageTypes = 5;
}
/**
* registration - Information of the datanode registering with the namenode
*/

View File

@ -18,9 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -29,8 +26,7 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test;
@ -74,9 +70,6 @@ public void testWhenStoragePolicySetToCOLD()
try {
// Change policy to ALL_SSD
distributedFS.setStoragePolicy(new Path(file), "COLD");
Set<DatanodeDescriptor> previousNodes =
hdfsCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanodes();
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
@ -91,8 +84,8 @@ public void testWhenStoragePolicySetToCOLD()
hdfsCluster.triggerHeartbeats();
// Wait till namenode notified about the block location details
waitExpectedStorageType(StorageType.ARCHIVE, distributedFS, previousNodes,
6, 30000);
waitExpectedStorageType(file, StorageType.ARCHIVE, distributedFS, 3,
30000);
} finally {
hdfsCluster.shutdown();
}
@ -104,9 +97,6 @@ public void testWhenStoragePolicySetToALLSSD()
try {
// Change policy to ALL_SSD
distributedFS.setStoragePolicy(new Path(file), "ALL_SSD");
Set<DatanodeDescriptor> previousNodes =
hdfsCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanodes();
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
@ -123,8 +113,34 @@ public void testWhenStoragePolicySetToALLSSD()
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas
waitExpectedStorageType(StorageType.SSD, distributedFS, previousNodes, 6,
30000);
waitExpectedStorageType(file, StorageType.SSD, distributedFS, 3, 30000);
} finally {
hdfsCluster.shutdown();
}
}
@Test(timeout = 300000)
public void testWhenStoragePolicySetToONESSD()
throws Exception {
try {
// Change policy to ONE_SSD
distributedFS.setStoragePolicy(new Path(file), "ONE_SSD");
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
// Making sure SDD based nodes added to cluster. Adding SSD based
// datanodes.
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas
waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000);
waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000);
} finally {
hdfsCluster.shutdown();
}
@ -174,35 +190,31 @@ private MiniDFSCluster startCluster(final Configuration conf,
return cluster;
}
// TODO: this assertion can be changed to end to end based assertion later
// when DN side processing work integrated to this work.
private void waitExpectedStorageType(final StorageType expectedStorageType,
final DistributedFileSystem dfs,
final Set<DatanodeDescriptor> previousNodes, int expectedArchiveCount,
int timeout) throws Exception {
// Check whether the Block movement has been successfully completed to satisfy
// the storage policy for the given file.
private void waitExpectedStorageType(final String fileName,
final StorageType expectedStorageType, final DistributedFileSystem dfs,
int expectedStorageCount, int timeout) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
Iterator<DatanodeDescriptor> iterator = previousNodes.iterator();
int archiveCount = 0;
while (iterator.hasNext()) {
DatanodeDescriptor dn = iterator.next();
List<BlockMovingInfo> pendingItemsToMove =
dn.getStorageMovementPendingItems();
for (BlockMovingInfo blkInfoToMoveStorage : pendingItemsToMove) {
StorageType[] targetStorageTypes =
blkInfoToMoveStorage.getTargetStorageTypes();
for (StorageType storageType : targetStorageTypes) {
if (storageType == expectedStorageType) {
archiveCount++;
}
LocatedBlock lb = null;
try {
lb = dfs.getClient().getLocatedBlocks(fileName, 0).get(0);
} catch (IOException e) {
LOG.error("Exception while getting located blocks", e);
return false;
}
int actualStorageCount = 0;
for (StorageType storageType : lb.getStorageTypes()) {
if (expectedStorageType == storageType) {
actualStorageCount++;
}
}
LOG.info(
expectedStorageType + " replica count, expected={} and actual={}",
expectedArchiveCount, archiveCount);
return expectedArchiveCount == archiveCount;
expectedStorageType, actualStorageCount);
return expectedStorageCount == actualStorageCount;
}
}, 100, timeout);
}