HDFS-12570: [SPS]: Refactor Co-ordinator datanode logic to track the block storage movements. Contributed by Rakesh R.

This commit is contained in:
Uma Maheswara Rao G 2017-10-12 17:17:51 -07:00 committed by Uma Maheswara Rao Gangumalla
parent bfd3f8bd8a
commit 00eceed233
37 changed files with 908 additions and 1135 deletions

View File

@ -629,11 +629,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY =
"dfs.storage.policy.satisfier.recheck.timeout.millis";
public static final int DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT =
5 * 60 * 1000;
1 * 60 * 1000;
public static final String DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY =
"dfs.storage.policy.satisfier.self.retry.timeout.millis";
public static final int DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT =
20 * 60 * 1000;
5 * 60 * 1000;
public static final String DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY =
"dfs.storage.policy.satisfier.low.max-streams.preference";
public static final boolean DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT =
false;
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
public static final int DFS_DATANODE_DEFAULT_PORT = 9866;

View File

@ -48,7 +48,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -140,7 +140,8 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks,
BlocksStorageMovementResult[] blksMovementResults) throws IOException {
BlocksStorageMoveAttemptFinished storageMovementFinishedBlks)
throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@ -165,8 +166,11 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
}
// Adding blocks movement results to the heart beat request.
builder.addAllBlksMovementResults(
PBHelper.convertBlksMovResults(blksMovementResults));
if (storageMovementFinishedBlks != null
&& storageMovementFinishedBlks.getBlocks() != null) {
builder.setStorageMoveAttemptFinishedBlks(
PBHelper.convertBlksMovReport(storageMovementFinishedBlks));
}
HeartbeatResponseProto resp;
try {

View File

@ -123,8 +123,8 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
volumeFailureSummary, request.getRequestFullBlockReportLease(),
PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
PBHelper.convertSlowDiskInfo(request.getSlowDisksList()),
PBHelper.convertBlksMovResults(
request.getBlksMovementResultsList()));
PBHelper.convertBlksMovReport(
request.getStorageMoveAttemptFinishedBlks()));
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockMovingInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
@ -56,11 +57,11 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMovementResultProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMoveAttemptFinishedProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
@ -104,8 +105,7 @@
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@ -971,59 +971,27 @@ public static SlowDiskReports convertSlowDiskInfo(
return SlowDiskReports.create(slowDisksMap);
}
public static BlocksStorageMovementResult[] convertBlksMovResults(
List<BlocksStorageMovementResultProto> protos) {
BlocksStorageMovementResult[] results =
new BlocksStorageMovementResult[protos.size()];
for (int i = 0; i < protos.size(); i++) {
BlocksStorageMovementResultProto resultProto = protos.get(i);
BlocksStorageMovementResult.Status status;
switch (resultProto.getStatus()) {
case SUCCESS:
status = Status.SUCCESS;
break;
case FAILURE:
status = Status.FAILURE;
break;
case IN_PROGRESS:
status = Status.IN_PROGRESS;
break;
default:
throw new AssertionError("Unknown status: " + resultProto.getStatus());
}
results[i] = new BlocksStorageMovementResult(resultProto.getTrackID(),
status);
public static BlocksStorageMoveAttemptFinished convertBlksMovReport(
BlocksStorageMoveAttemptFinishedProto proto) {
List<BlockProto> blocksList = proto.getBlocksList();
Block[] blocks = new Block[blocksList.size()];
for (int i = 0; i < blocksList.size(); i++) {
BlockProto blkProto = blocksList.get(i);
blocks[i] = PBHelperClient.convert(blkProto);
}
return results;
return new BlocksStorageMoveAttemptFinished(blocks);
}
public static List<BlocksStorageMovementResultProto> convertBlksMovResults(
BlocksStorageMovementResult[] blocksMovementResults) {
List<BlocksStorageMovementResultProto> blocksMovementResultsProto =
new ArrayList<>();
BlocksStorageMovementResultProto.Builder builder =
BlocksStorageMovementResultProto.newBuilder();
for (int i = 0; i < blocksMovementResults.length; i++) {
BlocksStorageMovementResult report = blocksMovementResults[i];
builder.setTrackID(report.getTrackId());
BlocksStorageMovementResultProto.Status status;
switch (report.getStatus()) {
case SUCCESS:
status = BlocksStorageMovementResultProto.Status.SUCCESS;
break;
case FAILURE:
status = BlocksStorageMovementResultProto.Status.FAILURE;
break;
case IN_PROGRESS:
status = BlocksStorageMovementResultProto.Status.IN_PROGRESS;
break;
default:
throw new AssertionError("Unknown status: " + report.getStatus());
}
builder.setStatus(status);
blocksMovementResultsProto.add(builder.build());
public static BlocksStorageMoveAttemptFinishedProto convertBlksMovReport(
BlocksStorageMoveAttemptFinished blocksMoveAttemptFinished) {
BlocksStorageMoveAttemptFinishedProto.Builder builder =
BlocksStorageMoveAttemptFinishedProto.newBuilder();
Block[] blocks = blocksMoveAttemptFinished.getBlocks();
for (Block block : blocks) {
builder.addBlocks(PBHelperClient.convert(block));
}
return blocksMovementResultsProto;
return builder.build();
}
public static JournalInfo convert(JournalInfoProto info) {
@ -1211,34 +1179,34 @@ private static BlockStorageMovementCommandProto convert(
BlockStorageMovementCommandProto.Builder builder =
BlockStorageMovementCommandProto.newBuilder();
builder.setTrackID(blkStorageMovementCmd.getTrackID());
builder.setBlockPoolId(blkStorageMovementCmd.getBlockPoolId());
Collection<BlockMovingInfo> blockMovingInfos = blkStorageMovementCmd
.getBlockMovingTasks();
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
builder.addBlockStorageMovement(
convertBlockMovingInfo(blkMovingInfo));
builder.addBlockMovingInfo(convertBlockMovingInfo(blkMovingInfo));
}
return builder.build();
}
private static BlockStorageMovementProto convertBlockMovingInfo(
private static BlockMovingInfoProto convertBlockMovingInfo(
BlockMovingInfo blkMovingInfo) {
BlockStorageMovementProto.Builder builder = BlockStorageMovementProto
BlockMovingInfoProto.Builder builder = BlockMovingInfoProto
.newBuilder();
builder.setBlock(PBHelperClient.convert(blkMovingInfo.getBlock()));
DatanodeInfo[] sourceDnInfos = blkMovingInfo.getSources();
builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
DatanodeInfo sourceDnInfo = blkMovingInfo.getSource();
builder.setSourceDnInfo(PBHelperClient.convert(sourceDnInfo));
DatanodeInfo[] targetDnInfos = blkMovingInfo.getTargets();
builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
DatanodeInfo targetDnInfo = blkMovingInfo.getTarget();
builder.setTargetDnInfo(PBHelperClient.convert(targetDnInfo));
StorageType[] sourceStorageTypes = blkMovingInfo.getSourceStorageTypes();
builder.setSourceStorageTypes(convertStorageTypesProto(sourceStorageTypes));
StorageType sourceStorageType = blkMovingInfo.getSourceStorageType();
builder.setSourceStorageType(
PBHelperClient.convertStorageType(sourceStorageType));
StorageType[] targetStorageTypes = blkMovingInfo.getTargetStorageTypes();
builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
StorageType targetStorageType = blkMovingInfo.getTargetStorageType();
builder.setTargetStorageType(
PBHelperClient.convertStorageType(targetStorageType));
return builder.build();
}
@ -1246,42 +1214,38 @@ private static BlockStorageMovementProto convertBlockMovingInfo(
private static DatanodeCommand convert(
BlockStorageMovementCommandProto blkStorageMovementCmdProto) {
Collection<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
List<BlockStorageMovementProto> blkSPSatisfyList =
blkStorageMovementCmdProto.getBlockStorageMovementList();
for (BlockStorageMovementProto blkSPSatisfy : blkSPSatisfyList) {
List<BlockMovingInfoProto> blkSPSatisfyList =
blkStorageMovementCmdProto.getBlockMovingInfoList();
for (BlockMovingInfoProto blkSPSatisfy : blkSPSatisfyList) {
blockMovingInfos.add(convertBlockMovingInfo(blkSPSatisfy));
}
return new BlockStorageMovementCommand(
DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
blkStorageMovementCmdProto.getTrackID(),
blkStorageMovementCmdProto.getBlockPoolId(), blockMovingInfos);
}
private static BlockMovingInfo convertBlockMovingInfo(
BlockStorageMovementProto blockStoragePolicySatisfyProto) {
BlockProto blockProto = blockStoragePolicySatisfyProto.getBlock();
BlockMovingInfoProto blockStorageMovingInfoProto) {
BlockProto blockProto = blockStorageMovingInfoProto.getBlock();
Block block = PBHelperClient.convert(blockProto);
DatanodeInfosProto sourceDnInfosProto = blockStoragePolicySatisfyProto
.getSourceDnInfos();
DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto);
DatanodeInfoProto sourceDnInfoProto = blockStorageMovingInfoProto
.getSourceDnInfo();
DatanodeInfo sourceDnInfo = PBHelperClient.convert(sourceDnInfoProto);
DatanodeInfosProto targetDnInfosProto = blockStoragePolicySatisfyProto
.getTargetDnInfos();
DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto);
DatanodeInfoProto targetDnInfoProto = blockStorageMovingInfoProto
.getTargetDnInfo();
DatanodeInfo targetDnInfo = PBHelperClient.convert(targetDnInfoProto);
StorageTypeProto srcStorageTypeProto = blockStorageMovingInfoProto
.getSourceStorageType();
StorageType srcStorageType = PBHelperClient
.convertStorageType(srcStorageTypeProto);
StorageTypesProto srcStorageTypesProto = blockStoragePolicySatisfyProto
.getSourceStorageTypes();
StorageType[] srcStorageTypes = PBHelperClient.convertStorageTypes(
srcStorageTypesProto.getStorageTypesList(),
srcStorageTypesProto.getStorageTypesList().size());
StorageTypesProto targetStorageTypesProto = blockStoragePolicySatisfyProto
.getTargetStorageTypes();
StorageType[] targetStorageTypes = PBHelperClient.convertStorageTypes(
targetStorageTypesProto.getStorageTypesList(),
targetStorageTypesProto.getStorageTypesList().size());
return new BlockMovingInfo(block, sourceDnInfos, targetDnInfos,
srcStorageTypes, targetStorageTypes);
StorageTypeProto targetStorageTypeProto = blockStorageMovingInfoProto
.getTargetStorageType();
StorageType targetStorageType = PBHelperClient
.convertStorageType(targetStorageTypeProto);
return new BlockMovingInfo(block, sourceDnInfo, targetDnInfo,
srcStorageType, targetStorageType);
}
}

View File

@ -41,7 +41,6 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
@ -212,7 +211,7 @@ public Type getType() {
* A queue of blocks corresponding to trackID for moving its storage
* placements by this datanode.
*/
private final Queue<BlockStorageMovementInfosBatch> storageMovementBlocks =
private final Queue<BlockMovingInfo> storageMovementBlocks =
new LinkedList<>();
private volatile boolean dropSPSWork = false;
@ -1079,30 +1078,45 @@ public boolean hasStorageType(StorageType type) {
/**
* Add the block infos which needs to move its storage locations.
*
* @param trackID
* - unique identifier which will be used for tracking the given set
* of blocks movement completion.
* @param storageMismatchedBlocks
* - storage mismatched block infos
* @param blkMovingInfo
* - storage mismatched block info
*/
public void addBlocksToMoveStorage(long trackID,
List<BlockMovingInfo> storageMismatchedBlocks) {
public void addBlocksToMoveStorage(BlockMovingInfo blkMovingInfo) {
synchronized (storageMovementBlocks) {
storageMovementBlocks.offer(
new BlockStorageMovementInfosBatch(trackID, storageMismatchedBlocks));
storageMovementBlocks.offer(blkMovingInfo);
}
}
/**
* @return block infos which needs to move its storage locations. This returns
* list of blocks under one trackId.
* Return the number of blocks queued up for movement.
*/
public BlockStorageMovementInfosBatch getBlocksToMoveStorages() {
public int getNumberOfBlocksToMoveStorages() {
return storageMovementBlocks.size();
}
/**
* Get the blocks to move to satisfy the storage media type.
*
* @param numBlocksToMoveTasks
* total number of blocks which will be send to this datanode for
* block movement.
*
* @return block infos which needs to move its storage locations.
*/
public BlockMovingInfo[] getBlocksToMoveStorages(int numBlocksToMoveTasks) {
synchronized (storageMovementBlocks) {
// TODO: Presently returning the list of blocks under one trackId.
// Need to limit the list of items into small batches with in trackId
// itself if blocks are many(For example: a file contains many blocks).
return storageMovementBlocks.poll();
List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
for (; !storageMovementBlocks.isEmpty()
&& numBlocksToMoveTasks > 0; numBlocksToMoveTasks--) {
blockMovingInfos.add(storageMovementBlocks.poll());
}
BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos
.size()];
blkMoveArray = blockMovingInfos.toArray(blkMoveArray);
if (blkMoveArray.length > 0) {
return blkMoveArray;
}
return null;
}
}

View File

@ -40,7 +40,6 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@ -48,6 +47,7 @@
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
@ -208,6 +208,8 @@ public class DatanodeManager {
*/
private final long timeBetweenResendingCachingDirectivesMs;
private final boolean blocksToMoveShareEqualRatio;
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
@ -332,6 +334,12 @@ public class DatanodeManager {
this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY,
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
// SPS configuration to decide blocks to move can share equal ratio of
// maxtransfers with pending replica and erasure-coded reconstruction tasks
blocksToMoveShareEqualRatio = conf.getBoolean(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT);
}
private static long getStaleIntervalFromConf(Configuration conf,
@ -1094,13 +1102,14 @@ nodes with its data cleared (or user can just remove the StorageID
// Sets dropSPSWork flag to true, to ensure that
// DNA_DROP_SPS_WORK_COMMAND will send to datanode via next heartbeat
// response immediately after the node registration. This is
// to avoid a situation, where multiple trackId responses coming from
// different co-odinator datanodes. After SPS monitor time out, it
// will retry the files which were scheduled to the disconnected(for
// long time more than heartbeat expiry) DN, by finding new
// co-ordinator datanode. Now, if the expired datanode reconnects back
// after SPS reschedules, it leads to get different movement results
// from reconnected and new DN co-ordinators.
// to avoid a situation, where multiple block attempt finished
// responses coming from different datanodes. After SPS monitor time
// out, it will retry the files which were scheduled to the
// disconnected(for long time more than heartbeat expiry) DN, by
// finding new datanode. Now, if the expired datanode reconnects back
// after SPS reschedules, it leads to get different movement attempt
// finished report from reconnected and newly datanode which is
// attempting the block movement.
nodeS.setDropSPSWork(true);
// resolve network location
@ -1680,19 +1689,47 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
final List<DatanodeCommand> cmds = new ArrayList<>();
// Allocate _approximately_ maxTransfers pending tasks to DataNode.
// NN chooses pending tasks based on the ratio between the lengths of
// replication and erasure-coded block queues.
// replication, erasure-coded block queues and block storage movement
// queues.
int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();
int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
int totalBlocksToMove = nodeinfo.getNumberOfBlocksToMoveStorages();
int totalBlocks = totalReplicateBlocks + totalECBlocks;
if (totalBlocks > 0) {
int numReplicationTasks = (int) Math.ceil(
(double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
int numECTasks = (int) Math.ceil(
(double) (totalECBlocks * maxTransfers) / totalBlocks);
if (totalBlocks > 0 || totalBlocksToMove > 0) {
int numReplicationTasks = 0;
int numECTasks = 0;
int numBlocksToMoveTasks = 0;
// Check blocksToMoveShareEqualRatio configuration is true/false. If true,
// then equally sharing the max transfer. Otherwise gives high priority to
// the pending_replica/erasure-coded tasks and only the delta streams will
// be used for blocks to move tasks.
if (blocksToMoveShareEqualRatio) {
// add blocksToMove count to total blocks so that will get equal share
totalBlocks = totalBlocks + totalBlocksToMove;
numReplicationTasks = (int) Math
.ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
numECTasks = (int) Math
.ceil((double) (totalECBlocks * maxTransfers) / totalBlocks);
numBlocksToMoveTasks = (int) Math
.ceil((double) (totalBlocksToMove * maxTransfers) / totalBlocks);
} else {
// Calculate the replica and ec tasks, then pick blocksToMove if there
// is any streams available.
numReplicationTasks = (int) Math
.ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
numECTasks = (int) Math
.ceil((double) (totalECBlocks * maxTransfers) / totalBlocks);
int numTasks = numReplicationTasks + numECTasks;
if (numTasks < maxTransfers) {
int remainingMaxTransfers = maxTransfers - numTasks;
numBlocksToMoveTasks = Math.min(totalBlocksToMove,
remainingMaxTransfers);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Pending replication tasks: " + numReplicationTasks
+ " erasure-coded tasks: " + numECTasks);
+ " erasure-coded tasks: " + numECTasks + " blocks to move tasks: "
+ numBlocksToMoveTasks);
}
// check pending replication tasks
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
@ -1708,6 +1745,23 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
cmds.add(new BlockECReconstructionCommand(
DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
}
// check pending block storage movement tasks
if (nodeinfo.shouldDropSPSWork()) {
cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
// Set back to false to indicate that the new value has been sent to the
// datanode.
nodeinfo.setDropSPSWork(false);
} else {
// Get pending block storage movement tasks
BlockMovingInfo[] blkStorageMovementInfos = nodeinfo
.getBlocksToMoveStorages(numBlocksToMoveTasks);
if (blkStorageMovementInfos != null) {
cmds.add(new BlockStorageMovementCommand(
DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, blockPoolId,
Arrays.asList(blkStorageMovementInfos)));
}
}
}
// check block invalidation
@ -1751,24 +1805,6 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
}
}
if (nodeinfo.shouldDropSPSWork()) {
cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
// Set back to false to indicate that the new value has been sent to the
// datanode.
nodeinfo.setDropSPSWork(false);
}
// check pending block storage movement tasks
BlockStorageMovementInfosBatch blkStorageMovementInfosBatch = nodeinfo
.getBlocksToMoveStorages();
if (blkStorageMovementInfosBatch != null) {
cmds.add(new BlockStorageMovementCommand(
DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
blkStorageMovementInfosBatch.getTrackID(), blockPoolId,
blkStorageMovementInfosBatch.getBlockMovingInfo()));
}
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}

View File

@ -799,8 +799,7 @@ assert getBlockPoolId().equals(bp) :
LOG.info("DatanodeCommand action: DNA_BLOCK_STORAGE_MOVEMENT");
BlockStorageMovementCommand blkSPSCmd = (BlockStorageMovementCommand) cmd;
dn.getStoragePolicySatisfyWorker().processBlockMovingTasks(
blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(),
blkSPSCmd.getBlockMovingTasks());
blkSPSCmd.getBlockPoolId(), blkSPSCmd.getBlockMovingTasks());
break;
case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
LOG.info("DatanodeCommand action: DNA_DROP_SPS_WORK_COMMAND");

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -50,7 +51,7 @@
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -513,8 +514,11 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
SlowDiskReports.EMPTY_REPORT;
BlocksStorageMovementResult[] blksMovementResults =
getBlocksMovementResults();
// Get the blocks storage move attempt finished blocks
List<Block> results = dn.getStoragePolicySatisfyWorker()
.getBlocksMovementsStatusHandler().getMoveAttemptFinishedBlocks();
BlocksStorageMoveAttemptFinished storageMoveAttemptFinishedBlks =
getStorageMoveAttemptFinishedBlocks(results);
HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
@ -527,7 +531,7 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
requestBlockReportLease,
slowPeers,
slowDisks,
blksMovementResults);
storageMoveAttemptFinishedBlks);
if (outliersReportDue) {
// If the report was due and successfully sent, schedule the next one.
@ -537,20 +541,23 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
// Remove the blocks movement results after successfully transferring
// to namenode.
dn.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
.remove(blksMovementResults);
.remove(results);
return response;
}
private BlocksStorageMovementResult[] getBlocksMovementResults() {
List<BlocksStorageMovementResult> trackIdVsMovementStatus = dn
.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
.getBlksMovementResults();
BlocksStorageMovementResult[] blksMovementResult =
new BlocksStorageMovementResult[trackIdVsMovementStatus.size()];
trackIdVsMovementStatus.toArray(blksMovementResult);
private BlocksStorageMoveAttemptFinished getStorageMoveAttemptFinishedBlocks(
List<Block> finishedBlks) {
return blksMovementResult;
if (finishedBlks.isEmpty()) {
return null;
}
// Create BlocksStorageMoveAttemptFinished with currently finished
// blocks
Block[] blockList = new Block[finishedBlks.size()];
finishedBlks.toArray(blockList);
return new BlocksStorageMoveAttemptFinished(blockList);
}
@VisibleForTesting

View File

@ -21,14 +21,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementAttemptFinished;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsStatusHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,12 +41,12 @@
public class BlockStorageMovementTracker implements Runnable {
private static final Logger LOG = LoggerFactory
.getLogger(BlockStorageMovementTracker.class);
private final CompletionService<BlockMovementResult> moverCompletionService;
private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
private final BlocksMovementsStatusHandler blksMovementsStatusHandler;
// Keeps the information - trackID vs its list of blocks
private final Map<Long, List<Future<BlockMovementResult>>> moverTaskFutures;
private final Map<Long, List<BlockMovementResult>> movementResults;
// Keeps the information - block vs its list of future move tasks
private final Map<Block, List<Future<BlockMovementAttemptFinished>>> moverTaskFutures;
private final Map<Block, List<BlockMovementAttemptFinished>> movementResults;
private volatile boolean running = true;
@ -59,7 +59,7 @@ public class BlockStorageMovementTracker implements Runnable {
* blocks movements status handler
*/
public BlockStorageMovementTracker(
CompletionService<BlockMovementResult> moverCompletionService,
CompletionService<BlockMovementAttemptFinished> moverCompletionService,
BlocksMovementsStatusHandler handler) {
this.moverCompletionService = moverCompletionService;
this.moverTaskFutures = new HashMap<>();
@ -82,32 +82,33 @@ public void run() {
}
}
try {
Future<BlockMovementResult> future = moverCompletionService.take();
Future<BlockMovementAttemptFinished> future =
moverCompletionService.take();
if (future != null) {
BlockMovementResult result = future.get();
BlockMovementAttemptFinished result = future.get();
LOG.debug("Completed block movement. {}", result);
long trackId = result.getTrackId();
List<Future<BlockMovementResult>> blocksMoving = moverTaskFutures
.get(trackId);
Block block = result.getBlock();
List<Future<BlockMovementAttemptFinished>> blocksMoving =
moverTaskFutures.get(block);
if (blocksMoving == null) {
LOG.warn("Future task doesn't exist for trackId " + trackId);
LOG.warn("Future task doesn't exist for block : {} ", block);
continue;
}
blocksMoving.remove(future);
List<BlockMovementResult> resultPerTrackIdList =
addMovementResultToTrackIdList(result);
List<BlockMovementAttemptFinished> resultPerTrackIdList =
addMovementResultToBlockIdList(result);
// Completed all the scheduled blocks movement under this 'trackId'.
if (blocksMoving.isEmpty() || moverTaskFutures.get(trackId) == null) {
if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) {
synchronized (moverTaskFutures) {
moverTaskFutures.remove(trackId);
moverTaskFutures.remove(block);
}
if (running) {
// handle completed or inprogress blocks movements per trackId.
blksMovementsStatusHandler.handle(resultPerTrackIdList);
}
movementResults.remove(trackId);
movementResults.remove(block);
}
}
} catch (InterruptedException e) {
@ -123,38 +124,39 @@ public void run() {
}
}
private List<BlockMovementResult> addMovementResultToTrackIdList(
BlockMovementResult result) {
long trackId = result.getTrackId();
List<BlockMovementResult> perTrackIdList;
private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList(
BlockMovementAttemptFinished result) {
Block block = result.getBlock();
List<BlockMovementAttemptFinished> perBlockIdList;
synchronized (movementResults) {
perTrackIdList = movementResults.get(trackId);
if (perTrackIdList == null) {
perTrackIdList = new ArrayList<>();
movementResults.put(trackId, perTrackIdList);
perBlockIdList = movementResults.get(block);
if (perBlockIdList == null) {
perBlockIdList = new ArrayList<>();
movementResults.put(block, perBlockIdList);
}
perTrackIdList.add(result);
perBlockIdList.add(result);
}
return perTrackIdList;
return perBlockIdList;
}
/**
* Add future task to the tracking list to check the completion status of the
* block movement.
*
* @param trackID
* tracking Id
* @param blockID
* block identifier
* @param futureTask
* future task used for moving the respective block
*/
void addBlock(long trackID, Future<BlockMovementResult> futureTask) {
void addBlock(Block block,
Future<BlockMovementAttemptFinished> futureTask) {
synchronized (moverTaskFutures) {
List<Future<BlockMovementResult>> futures = moverTaskFutures
.get(Long.valueOf(trackID));
List<Future<BlockMovementAttemptFinished>> futures =
moverTaskFutures.get(block);
// null for the first task
if (futures == null) {
futures = new ArrayList<>();
moverTaskFutures.put(trackID, futures);
moverTaskFutures.put(block, futures);
}
futures.add(futureTask);
// Notify waiting tracker thread about the newly added tasks.
@ -174,16 +176,6 @@ void removeAll() {
}
}
/**
* @return the list of trackIds which are still waiting to complete all the
* scheduled blocks movements.
*/
Set<Long> getInProgressTrackIds() {
synchronized (moverTaskFutures) {
return moverTaskFutures.keySet();
}
}
/**
* Sets running flag to false and clear the pending movement result queues.
*/

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@ -32,9 +31,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
@ -62,7 +59,6 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@ -89,14 +85,11 @@ public class StoragePolicySatisfyWorker {
private final int moverThreads;
private final ExecutorService moveExecutor;
private final CompletionService<BlockMovementResult> moverCompletionService;
private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
private final BlocksMovementsStatusHandler handler;
private final BlockStorageMovementTracker movementTracker;
private Daemon movementTrackerThread;
private long inprogressTrackIdsCheckInterval = 30 * 1000; // 30seconds.
private long nextInprogressRecheckTime;
public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
this.datanode = datanode;
this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
@ -111,16 +104,6 @@ public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
movementTrackerThread = new Daemon(movementTracker);
movementTrackerThread.setName("BlockStorageMovementTracker");
// Interval to check that the inprogress trackIds. The time interval is
// proportional o the heart beat interval time period.
final long heartbeatIntervalSeconds = conf.getTimeDuration(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
inprogressTrackIdsCheckInterval = 5 * heartbeatIntervalSeconds;
// update first inprogress recheck time to a future time stamp.
nextInprogressRecheckTime = monotonicNow()
+ inprogressTrackIdsCheckInterval;
// TODO: Needs to manage the number of concurrent moves per DataNode.
}
@ -186,30 +169,26 @@ public void rejectedExecution(Runnable runnable,
* separate thread. Each task will move the block replica to the target node &
* wait for the completion.
*
* @param trackID
* unique tracking identifier
* @param blockPoolID
* block pool ID
* @param blockPoolID block pool identifier
*
* @param blockMovingInfos
* list of blocks to be moved
*/
public void processBlockMovingTasks(long trackID, String blockPoolID,
Collection<BlockMovingInfo> blockMovingInfos) {
public void processBlockMovingTasks(final String blockPoolID,
final Collection<BlockMovingInfo> blockMovingInfos) {
LOG.debug("Received BlockMovingTasks {}", blockMovingInfos);
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
assert blkMovingInfo.getSources().length == blkMovingInfo
.getTargets().length;
for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
DatanodeInfo target = blkMovingInfo.getTargets()[i];
BlockMovingTask blockMovingTask = new BlockMovingTask(
trackID, blockPoolID, blkMovingInfo.getBlock(),
blkMovingInfo.getSources()[i], target,
blkMovingInfo.getSourceStorageTypes()[i],
blkMovingInfo.getTargetStorageTypes()[i]);
Future<BlockMovementResult> moveCallable = moverCompletionService
.submit(blockMovingTask);
movementTracker.addBlock(trackID, moveCallable);
}
StorageType sourceStorageType = blkMovingInfo.getSourceStorageType();
StorageType targetStorageType = blkMovingInfo.getTargetStorageType();
assert sourceStorageType != targetStorageType
: "Source and Target storage type shouldn't be same!";
BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID,
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
blkMovingInfo.getTarget(), sourceStorageType, targetStorageType);
Future<BlockMovementAttemptFinished> moveCallable = moverCompletionService
.submit(blockMovingTask);
movementTracker.addBlock(blkMovingInfo.getBlock(),
moveCallable);
}
}
@ -217,8 +196,7 @@ public void processBlockMovingTasks(long trackID, String blockPoolID,
* This class encapsulates the process of moving the block replica to the
* given target and wait for the response.
*/
private class BlockMovingTask implements Callable<BlockMovementResult> {
private final long trackID;
private class BlockMovingTask implements Callable<BlockMovementAttemptFinished> {
private final String blockPoolID;
private final Block block;
private final DatanodeInfo source;
@ -226,10 +204,9 @@ private class BlockMovingTask implements Callable<BlockMovementResult> {
private final StorageType srcStorageType;
private final StorageType targetStorageType;
BlockMovingTask(long trackID, String blockPoolID, Block block,
BlockMovingTask(String blockPoolID, Block block,
DatanodeInfo source, DatanodeInfo target,
StorageType srcStorageType, StorageType targetStorageType) {
this.trackID = trackID;
this.blockPoolID = blockPoolID;
this.block = block;
this.source = source;
@ -239,23 +216,26 @@ private class BlockMovingTask implements Callable<BlockMovementResult> {
}
@Override
public BlockMovementResult call() {
public BlockMovementAttemptFinished call() {
BlockMovementStatus status = moveBlock();
return new BlockMovementResult(trackID, block.getBlockId(), target,
status);
return new BlockMovementAttemptFinished(block, source, target, status);
}
private BlockMovementStatus moveBlock() {
LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
+ "storageType, sourceStoragetype:{} and destinStoragetype:{}",
+ "storageType, sourceStoragetype:{} and destinStoragetype:{}",
block, source, target, srcStorageType, targetStorageType);
Socket sock = null;
DataOutputStream out = null;
DataInputStream in = null;
try {
datanode.incrementXmitsInProgress();
ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
DNConf dnConf = datanode.getDnConf();
String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname());
String dnAddr = datanode.getDatanodeId()
.getXferAddr(dnConf.getConnectToDnViaHostname());
sock = datanode.newSocket();
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr),
dnConf.getSocketTimeout());
@ -297,9 +277,10 @@ private BlockMovementStatus moveBlock() {
LOG.warn(
"Failed to move block:{} from src:{} to destin:{} to satisfy "
+ "storageType:{}",
block, source, target, targetStorageType, e);
block, source, target, targetStorageType, e);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
} finally {
datanode.decrementXmitsInProgress();
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeSocket(sock);
@ -357,29 +338,25 @@ int getStatusCode() {
}
/**
* This class represents result from a block movement task. This will have the
* This class represents status from a block movement task. This will have the
* information of the task which was successful or failed due to errors.
*/
static class BlockMovementResult {
private final long trackId;
private final long blockId;
static class BlockMovementAttemptFinished {
private final Block block;
private final DatanodeInfo src;
private final DatanodeInfo target;
private final BlockMovementStatus status;
BlockMovementResult(long trackId, long blockId,
BlockMovementAttemptFinished(Block block, DatanodeInfo src,
DatanodeInfo target, BlockMovementStatus status) {
this.trackId = trackId;
this.blockId = blockId;
this.block = block;
this.src = src;
this.target = target;
this.status = status;
}
long getTrackId() {
return trackId;
}
long getBlockId() {
return blockId;
Block getBlock() {
return block;
}
BlockMovementStatus getStatus() {
@ -388,99 +365,79 @@ BlockMovementStatus getStatus() {
@Override
public String toString() {
return new StringBuilder().append("Block movement result(\n ")
.append("track id: ").append(trackId).append(" block id: ")
.append(blockId).append(" target node: ").append(target)
return new StringBuilder().append("Block movement attempt finished(\n ")
.append(" block : ")
.append(block).append(" src node: ").append(src)
.append(" target node: ").append(target)
.append(" movement status: ").append(status).append(")").toString();
}
}
/**
* Blocks movements status handler, which is used to collect details of the
* completed or inprogress list of block movements and this status(success or
* failure or inprogress) will be send to the namenode via heartbeat.
* completed block movements and it will send these attempted finished(with
* success or failure) blocks to the namenode via heartbeat.
*/
class BlocksMovementsStatusHandler {
private final List<BlocksStorageMovementResult> trackIdVsMovementStatus =
public static class BlocksMovementsStatusHandler {
private final List<Block> blockIdVsMovementStatus =
new ArrayList<>();
/**
* Collect all the block movement results. Later this will be send to
* namenode via heart beat.
* Collect all the storage movement attempt finished blocks. Later this will
* be send to namenode via heart beat.
*
* @param results
* result of all the block movements per trackId
* @param moveAttemptFinishedBlks
* set of storage movement attempt finished blocks
*/
void handle(List<BlockMovementResult> resultsPerTrackId) {
BlocksStorageMovementResult.Status status =
BlocksStorageMovementResult.Status.SUCCESS;
long trackId = -1;
for (BlockMovementResult blockMovementResult : resultsPerTrackId) {
trackId = blockMovementResult.getTrackId();
if (blockMovementResult.status ==
BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE) {
status = BlocksStorageMovementResult.Status.FAILURE;
// If any of the block movement is failed, then mark as failure so
// that namenode can take a decision to retry the blocks associated to
// the given trackId.
break;
}
}
void handle(List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
List<Block> blocks = new ArrayList<>();
// Adding to the tracking results list. Later this will be send to
for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
blocks.add(item.getBlock());
}
// Adding to the tracking report list. Later this will be send to
// namenode via datanode heartbeat.
synchronized (trackIdVsMovementStatus) {
trackIdVsMovementStatus.add(
new BlocksStorageMovementResult(trackId, status));
synchronized (blockIdVsMovementStatus) {
blockIdVsMovementStatus.addAll(blocks);
}
}
/**
* @return unmodifiable list of blocks storage movement results.
* @return unmodifiable list of storage movement attempt finished blocks.
*/
List<BlocksStorageMovementResult> getBlksMovementResults() {
List<BlocksStorageMovementResult> movementResults = new ArrayList<>();
// 1. Adding all the completed trackids.
synchronized (trackIdVsMovementStatus) {
if (trackIdVsMovementStatus.size() > 0) {
movementResults = Collections
.unmodifiableList(trackIdVsMovementStatus);
List<Block> getMoveAttemptFinishedBlocks() {
List<Block> moveAttemptFinishedBlks = new ArrayList<>();
// 1. Adding all the completed block ids.
synchronized (blockIdVsMovementStatus) {
if (blockIdVsMovementStatus.size() > 0) {
moveAttemptFinishedBlks = Collections
.unmodifiableList(blockIdVsMovementStatus);
}
}
// 2. Adding the in progress track ids after those which are completed.
Set<Long> inProgressTrackIds = getInProgressTrackIds();
for (Long trackId : inProgressTrackIds) {
movementResults.add(new BlocksStorageMovementResult(trackId,
BlocksStorageMovementResult.Status.IN_PROGRESS));
}
return movementResults;
return moveAttemptFinishedBlks;
}
/**
* Remove the blocks storage movement results.
* Remove the storage movement attempt finished blocks from the tracking
* list.
*
* @param results
* set of blocks storage movement results
* @param moveAttemptFinishedBlks
* set of storage movement attempt finished blocks
*/
void remove(BlocksStorageMovementResult[] results) {
if (results != null) {
synchronized (trackIdVsMovementStatus) {
for (BlocksStorageMovementResult blocksMovementResult : results) {
trackIdVsMovementStatus.remove(blocksMovementResult);
}
}
void remove(List<Block> moveAttemptFinishedBlks) {
if (moveAttemptFinishedBlks != null) {
blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
}
}
/**
* Clear the trackID vs movement status tracking map.
* Clear the blockID vs movement status tracking map.
*/
void removeAll() {
synchronized (trackIdVsMovementStatus) {
trackIdVsMovementStatus.clear();
synchronized (blockIdVsMovementStatus) {
blockIdVsMovementStatus.clear();
}
}
}
@VisibleForTesting
@ -498,23 +455,4 @@ public void dropSPSWork() {
movementTracker.removeAll();
handler.removeAll();
}
/**
* Gets list of trackids which are inprogress. Will do collection periodically
* on 'dfs.datanode.storage.policy.satisfier.worker.inprogress.recheck.time.
* millis' interval.
*
* @return collection of trackids which are inprogress
*/
private Set<Long> getInProgressTrackIds() {
Set<Long> trackIds = new HashSet<>();
long now = monotonicNow();
if (nextInprogressRecheckTime >= now) {
trackIds = movementTracker.getInProgressTrackIds();
// schedule next re-check interval
nextInprogressRecheckTime = now + inprogressTrackIdsCheckInterval;
}
return trackIds;
}
}

View File

@ -22,15 +22,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,14 +35,12 @@
import com.google.common.annotations.VisibleForTesting;
/**
* A monitor class for checking whether block storage movements finished or not.
* If block storage movement results from datanode indicates about the movement
* success, then it will just remove the entries from tracking. If it reports
* failure, then it will add back to needed block storage movements list. If it
* reports in_progress, that means the blocks movement is in progress and the
* coordinator is still tracking the movement. If no DN reports about movement
* for longer time, then such items will be retries automatically after timeout.
* The default timeout would be 30mins.
* A monitor class for checking whether block storage movements attempt
* completed or not. If this receives block storage movement attempt
* status(either success or failure) from DN then it will just remove the
* entries from tracking. If there is no DN reports about movement attempt
* finished for a longer time period, then such items will retries automatically
* after timeout. The default timeout would be 5 minutes.
*/
public class BlockStorageMovementAttemptedItems {
private static final Logger LOG =
@ -55,37 +50,34 @@ public class BlockStorageMovementAttemptedItems {
* A map holds the items which are already taken for blocks movements
* processing and sent to DNs.
*/
private final Map<Long, AttemptedItemInfo> storageMovementAttemptedItems;
private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
private final List<AttemptedItemInfo> storageMovementAttemptedItems;
private final List<Block> movementFinishedBlocks;
private volatile boolean monitorRunning = true;
private Daemon timerThread = null;
private final StoragePolicySatisfier sps;
//
// It might take anywhere between 20 to 60 minutes before
// a request is timed out.
//
private long selfRetryTimeout = 20 * 60 * 1000;
//
// It might take anywhere between 5 to 10 minutes before
// a request is timed out.
//
private long minCheckTimeout = 5 * 60 * 1000; // minimum value
private long selfRetryTimeout = 5 * 60 * 1000;
//
// It might take anywhere between 1 to 2 minutes before
// a request is timed out.
//
private long minCheckTimeout = 1 * 60 * 1000; // minimum value
private BlockStorageMovementNeeded blockStorageMovementNeeded;
public BlockStorageMovementAttemptedItems(long recheckTimeout,
long selfRetryTimeout,
BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
StoragePolicySatisfier sps) {
BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
if (recheckTimeout > 0) {
this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
}
this.selfRetryTimeout = selfRetryTimeout;
this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
storageMovementAttemptedItems = new HashMap<>();
storageMovementAttemptedResults = new ArrayList<>();
this.sps = sps;
storageMovementAttemptedItems = new ArrayList<>();
movementFinishedBlocks = new ArrayList<>();
}
/**
@ -94,33 +86,26 @@ public BlockStorageMovementAttemptedItems(long recheckTimeout,
*
* @param itemInfo
* - tracking info
* @param allBlockLocsAttemptedToSatisfy
* - failed to find matching target nodes to satisfy storage type
* for all the block locations of the given blockCollectionID
*/
public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) {
public void add(AttemptedItemInfo itemInfo) {
synchronized (storageMovementAttemptedItems) {
AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo(
itemInfo.getStartId(), itemInfo.getTrackId(), monotonicNow(),
allBlockLocsAttemptedToSatisfy);
storageMovementAttemptedItems.put(itemInfo.getTrackId(),
attemptedItemInfo);
storageMovementAttemptedItems.add(itemInfo);
}
}
/**
* Add the trackIDBlocksStorageMovementResults to
* storageMovementAttemptedResults.
* Add the storage movement attempt finished blocks to
* storageMovementFinishedBlocks.
*
* @param blksMovementResults
* @param moveAttemptFinishedBlks
* storage movement attempt finished blocks
*/
public void addResults(BlocksStorageMovementResult[] blksMovementResults) {
if (blksMovementResults.length == 0) {
public void addReportedMovedBlocks(Block[] moveAttemptFinishedBlks) {
if (moveAttemptFinishedBlks.length == 0) {
return;
}
synchronized (storageMovementAttemptedResults) {
storageMovementAttemptedResults
.addAll(Arrays.asList(blksMovementResults));
synchronized (movementFinishedBlocks) {
movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
}
}
@ -129,8 +114,8 @@ public void addResults(BlocksStorageMovementResult[] blksMovementResults) {
*/
public synchronized void start() {
monitorRunning = true;
timerThread = new Daemon(new BlocksStorageMovementAttemptResultMonitor());
timerThread.setName("BlocksStorageMovementAttemptResultMonitor");
timerThread = new Daemon(new BlocksStorageMovementAttemptMonitor());
timerThread.setName("BlocksStorageMovementAttemptMonitor");
timerThread.start();
}
@ -163,82 +148,22 @@ synchronized void stopGracefully() {
}
/**
* This class contains information of an attempted trackID. Information such
* as, (a)last attempted or reported time stamp, (b)whether all the blocks in
* the trackID were attempted and blocks movement has been scheduled to
* satisfy storage policy. This is used by
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
* A monitor class for checking block storage movement attempt status and long
* waiting items periodically.
*/
private final static class AttemptedItemInfo extends ItemInfo {
private long lastAttemptedOrReportedTime;
private final boolean allBlockLocsAttemptedToSatisfy;
/**
* AttemptedItemInfo constructor.
*
* @param rootId
* rootId for trackId
* @param trackId
* trackId for file.
* @param lastAttemptedOrReportedTime
* last attempted or reported time
* @param allBlockLocsAttemptedToSatisfy
* whether all the blocks in the trackID were attempted and blocks
* movement has been scheduled to satisfy storage policy
*/
private AttemptedItemInfo(long rootId, long trackId,
long lastAttemptedOrReportedTime,
boolean allBlockLocsAttemptedToSatisfy) {
super(rootId, trackId);
this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy;
}
/**
* @return last attempted or reported time stamp.
*/
private long getLastAttemptedOrReportedTime() {
return lastAttemptedOrReportedTime;
}
/**
* @return true/false. True value represents that, all the block locations
* under the trackID has found matching target nodes to satisfy
* storage policy. False value represents that, trackID needed
* retries to satisfy the storage policy for some of the block
* locations.
*/
private boolean isAllBlockLocsAttemptedToSatisfy() {
return allBlockLocsAttemptedToSatisfy;
}
/**
* Update lastAttemptedOrReportedTime, so that the expiration time will be
* postponed to future.
*/
private void touchLastReportedTimeStamp() {
this.lastAttemptedOrReportedTime = monotonicNow();
}
}
/**
* A monitor class for checking block storage movement result and long waiting
* items periodically.
*/
private class BlocksStorageMovementAttemptResultMonitor implements Runnable {
private class BlocksStorageMovementAttemptMonitor implements Runnable {
@Override
public void run() {
while (monitorRunning) {
try {
blockStorageMovementResultCheck();
blockStorageMovementReportedItemsCheck();
blocksStorageMovementUnReportedItemsCheck();
Thread.sleep(minCheckTimeout);
} catch (InterruptedException ie) {
LOG.info("BlocksStorageMovementAttemptResultMonitor thread "
LOG.info("BlocksStorageMovementAttemptMonitor thread "
+ "is interrupted.", ie);
} catch (IOException ie) {
LOG.warn("BlocksStorageMovementAttemptResultMonitor thread "
LOG.warn("BlocksStorageMovementAttemptMonitor thread "
+ "received exception and exiting.", ie);
}
}
@ -248,29 +173,21 @@ public void run() {
@VisibleForTesting
void blocksStorageMovementUnReportedItemsCheck() {
synchronized (storageMovementAttemptedItems) {
Iterator<Entry<Long, AttemptedItemInfo>> iter =
storageMovementAttemptedItems.entrySet().iterator();
Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems
.iterator();
long now = monotonicNow();
while (iter.hasNext()) {
Entry<Long, AttemptedItemInfo> entry = iter.next();
AttemptedItemInfo itemInfo = entry.getValue();
AttemptedItemInfo itemInfo = iter.next();
if (now > itemInfo.getLastAttemptedOrReportedTime()
+ selfRetryTimeout) {
Long blockCollectionID = entry.getKey();
synchronized (storageMovementAttemptedResults) {
if (!isExistInResult(blockCollectionID)) {
ItemInfo candidate = new ItemInfo(
itemInfo.getStartId(), blockCollectionID);
blockStorageMovementNeeded.add(candidate);
iter.remove();
LOG.info("TrackID: {} becomes timed out and moved to needed "
+ "retries queue for next iteration.", blockCollectionID);
} else {
LOG.info("Blocks storage movement results for the"
+ " tracking id : " + blockCollectionID
+ " is reported from one of the co-ordinating datanode."
+ " So, the result will be processed soon.");
}
Long blockCollectionID = itemInfo.getTrackId();
synchronized (movementFinishedBlocks) {
ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
blockCollectionID);
blockStorageMovementNeeded.add(candidate);
iter.remove();
LOG.info("TrackID: {} becomes timed out and moved to needed "
+ "retries queue for next iteration.", blockCollectionID);
}
}
}
@ -278,118 +195,38 @@ void blocksStorageMovementUnReportedItemsCheck() {
}
}
private boolean isExistInResult(Long blockCollectionID) {
Iterator<BlocksStorageMovementResult> iter = storageMovementAttemptedResults
.iterator();
while (iter.hasNext()) {
BlocksStorageMovementResult storageMovementAttemptedResult = iter.next();
if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) {
return true;
}
}
return false;
}
@VisibleForTesting
void blockStorageMovementResultCheck() throws IOException {
synchronized (storageMovementAttemptedResults) {
Iterator<BlocksStorageMovementResult> resultsIter =
storageMovementAttemptedResults.iterator();
while (resultsIter.hasNext()) {
boolean isInprogress = false;
// TrackID need to be retried in the following cases:
// 1) All or few scheduled block(s) movement has been failed.
// 2) All the scheduled block(s) movement has been succeeded but there
// are unscheduled block(s) movement in this trackID. Say, some of
// the blocks in the trackID couldn't finding any matching target node
// for scheduling block movement in previous SPS iteration.
BlocksStorageMovementResult storageMovementAttemptedResult = resultsIter
.next();
void blockStorageMovementReportedItemsCheck() throws IOException {
synchronized (movementFinishedBlocks) {
Iterator<Block> finishedBlksIter = movementFinishedBlocks.iterator();
while (finishedBlksIter.hasNext()) {
Block blk = finishedBlksIter.next();
synchronized (storageMovementAttemptedItems) {
Status status = storageMovementAttemptedResult.getStatus();
long trackId = storageMovementAttemptedResult.getTrackId();
AttemptedItemInfo attemptedItemInfo = storageMovementAttemptedItems
.get(trackId);
// itemInfo is null means no root for trackId, using trackId only as
// root and handling it in
// blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning
// the xAttr
ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null)
? attemptedItemInfo.getStartId() : trackId, trackId);
switch (status) {
case FAILURE:
if (attemptedItemInfo != null) {
blockStorageMovementNeeded.add(itemInfo);
LOG.warn("Blocks storage movement results for the tracking id:"
+ "{} is reported from co-ordinating datanode, but result"
+ " status is FAILURE. So, added for retry", trackId);
} else {
LOG.info("Blocks storage movement is FAILURE for the track"
+ " id {}. But the trackID doesn't exists in"
+ " storageMovementAttemptedItems list.", trackId);
Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems
.iterator();
while (iterator.hasNext()) {
AttemptedItemInfo attemptedItemInfo = iterator.next();
attemptedItemInfo.getBlocks().remove(blk);
if (attemptedItemInfo.getBlocks().isEmpty()) {
// TODO: try add this at front of the Queue, so that this element
// gets the chance first and can be cleaned from queue quickly as
// all movements already done.
blockStorageMovementNeeded
.removeItemTrackInfo(itemInfo);
.add(new ItemInfo(attemptedItemInfo.getStartId(),
attemptedItemInfo.getTrackId()));
iterator.remove();
}
break;
case SUCCESS:
// ItemInfo could be null. One case is, before the blocks movements
// result arrives the attempted trackID became timed out and then
// removed the trackID from the storageMovementAttemptedItems list.
// TODO: Need to ensure that trackID is added to the
// 'blockStorageMovementNeeded' queue for retries to handle the
// following condition. If all the block locations under the trackID
// are attempted and failed to find matching target nodes to satisfy
// storage policy in previous SPS iteration.
String msg = "Blocks storage movement is SUCCESS for the track id: "
+ trackId + " reported from co-ordinating datanode.";
if (attemptedItemInfo != null) {
if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) {
blockStorageMovementNeeded
.add(new ItemInfo(attemptedItemInfo.getStartId(), trackId));
LOG.warn("{} But adding trackID back to retry queue as some of"
+ " the blocks couldn't find matching target nodes in"
+ " previous SPS iteration.", msg);
} else {
LOG.info(msg);
blockStorageMovementNeeded
.removeItemTrackInfo(itemInfo);
}
} else {
LOG.info("{} But the trackID doesn't exists in "
+ "storageMovementAttemptedItems list", msg);
blockStorageMovementNeeded
.removeItemTrackInfo(itemInfo);
}
break;
case IN_PROGRESS:
isInprogress = true;
attemptedItemInfo = storageMovementAttemptedItems
.get(storageMovementAttemptedResult.getTrackId());
if(attemptedItemInfo != null){
// update the attempted expiration time to next cycle.
attemptedItemInfo.touchLastReportedTimeStamp();
}
break;
default:
LOG.error("Unknown status: {}", status);
break;
}
// Remove trackID from the attempted list if the attempt has been
// completed(success or failure), if any.
if (!isInprogress) {
storageMovementAttemptedItems
.remove(storageMovementAttemptedResult.getTrackId());
}
}
// Remove trackID from results as processed above.
resultsIter.remove();
// Remove attempted blocks from movementFinishedBlocks list.
finishedBlksIter.remove();
}
}
}
@VisibleForTesting
public int resultsCount() {
return storageMovementAttemptedResults.size();
public int getMovementFinishedBlocksCount() {
return movementFinishedBlocks.size();
}
@VisibleForTesting
@ -398,7 +235,7 @@ public int getAttemptedItemsCount() {
}
public void clearQueues() {
storageMovementAttemptedResults.clear();
movementFinishedBlocks.clear();
storageMovementAttemptedItems.clear();
}
}

View File

@ -1,61 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.util.List;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
/**
* This class represents a batch of blocks under one trackId which needs to move
* its storage locations to satisfy the storage policy.
*/
public class BlockStorageMovementInfosBatch {
private long trackID;
private List<BlockMovingInfo> blockMovingInfos;
/**
* Constructor to create the block storage movement infos batch.
*
* @param trackID
* - unique identifier which will be used for tracking the given set
* of blocks movement.
* @param blockMovingInfos
* - list of block to storage infos.
*/
public BlockStorageMovementInfosBatch(long trackID,
List<BlockMovingInfo> blockMovingInfos) {
this.trackID = trackID;
this.blockMovingInfos = blockMovingInfos;
}
public long getTrackID() {
return trackID;
}
public List<BlockMovingInfo> getBlockMovingInfo() {
return blockMovingInfos;
}
@Override
public String toString() {
return new StringBuilder().append("BlockStorageMovementInfosBatch(\n ")
.append("TrackID: ").append(trackID).append(" BlockMovingInfos: ")
.append(blockMovingInfos).append(")").toString();
}
}

View File

@ -266,7 +266,7 @@
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -3927,7 +3927,8 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks,
BlocksStorageMovementResult[] blksMovementResults) throws IOException {
BlocksStorageMoveAttemptFinished blksMovementsFinished)
throws IOException {
readLock();
try {
//get datanode commands
@ -3948,11 +3949,11 @@ nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
if (!sps.isRunning()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Storage policy satisfier is not running. So, ignoring block "
+ "storage movement results sent by co-ordinator datanode");
"Storage policy satisfier is not running. So, ignoring storage"
+ " movement attempt finished block info sent by DN");
}
} else {
sps.handleBlocksStorageMovementResults(blksMovementResults);
sps.handleStorageMovementAttemptFinishedBlks(blksMovementsFinished);
}
}

View File

@ -156,7 +156,7 @@
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -1517,14 +1517,15 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks,
BlocksStorageMovementResult[] blkMovementStatus) throws IOException {
BlocksStorageMoveAttemptFinished storageMovementFinishedBlks)
throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
slowPeers, slowDisks,
blkMovementStatus);
storageMovementFinishedBlks);
}
@Override // DatanodeProtocol

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -44,7 +46,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.Daemon;
@ -82,25 +84,38 @@ public class StoragePolicySatisfier implements Runnable {
/**
* Represents the collective analysis status for all blocks.
*/
private enum BlocksMovingAnalysisStatus {
// Represents that, the analysis skipped due to some conditions. A such
// condition is if block collection is in incomplete state.
ANALYSIS_SKIPPED_FOR_RETRY,
// Represents that, all block storage movement needed blocks found its
// targets.
ALL_BLOCKS_TARGETS_PAIRED,
// Represents that, only fewer or none of the block storage movement needed
// block found its eligible targets.
FEW_BLOCKS_TARGETS_PAIRED,
// Represents that, none of the blocks found for block storage movements.
BLOCKS_ALREADY_SATISFIED,
// Represents that, the analysis skipped due to some conditions.
// Example conditions are if no blocks really exists in block collection or
// if analysis is not required on ec files with unsuitable storage policies
BLOCKS_TARGET_PAIRING_SKIPPED,
// Represents that, All the reported blocks are satisfied the policy but
// some of the blocks are low redundant.
FEW_LOW_REDUNDANCY_BLOCKS
private static class BlocksMovingAnalysis {
enum Status {
// Represents that, the analysis skipped due to some conditions. A such
// condition is if block collection is in incomplete state.
ANALYSIS_SKIPPED_FOR_RETRY,
// Represents that few or all blocks found respective target to do
// the storage movement.
BLOCKS_TARGETS_PAIRED,
// Represents that none of the blocks found respective target to do
// the storage movement.
NO_BLOCKS_TARGETS_PAIRED,
// Represents that, none of the blocks found for block storage movements.
BLOCKS_ALREADY_SATISFIED,
// Represents that, the analysis skipped due to some conditions.
// Example conditions are if no blocks really exists in block collection
// or
// if analysis is not required on ec files with unsuitable storage
// policies
BLOCKS_TARGET_PAIRING_SKIPPED,
// Represents that, All the reported blocks are satisfied the policy but
// some of the blocks are low redundant.
FEW_LOW_REDUNDANCY_BLOCKS
}
private Status status = null;
private List<Block> assignedBlocks = null;
BlocksMovingAnalysis(Status status, List<Block> blockMovingInfo) {
this.status = status;
this.assignedBlocks = blockMovingInfo;
}
}
public StoragePolicySatisfier(final Namesystem namesystem,
@ -118,8 +133,7 @@ public StoragePolicySatisfier(final Namesystem namesystem,
conf.getLong(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
storageMovementNeeded,
this);
storageMovementNeeded);
this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
}
@ -232,21 +246,25 @@ public void run() {
namesystem.getBlockCollection(trackId);
// Check blockCollectionId existence.
if (blockCollection != null) {
BlocksMovingAnalysisStatus status =
BlocksMovingAnalysis status =
analyseBlocksStorageMovementsAndAssignToDN(blockCollection);
switch (status) {
switch (status.status) {
// Just add to monitor, so it will be retried after timeout
case ANALYSIS_SKIPPED_FOR_RETRY:
// Just add to monitor, so it will be tracked for result and
// be removed on successful storage movement result.
case ALL_BLOCKS_TARGETS_PAIRED:
this.storageMovementsMonitor.add(itemInfo, true);
// Just add to monitor, so it will be tracked for report and
// be removed on storage movement attempt finished report.
case BLOCKS_TARGETS_PAIRED:
this.storageMovementsMonitor.add(new AttemptedItemInfo(
itemInfo.getStartId(), itemInfo.getTrackId(),
monotonicNow(), status.assignedBlocks));
break;
// Add to monitor with allBlcoksAttemptedToSatisfy flag false, so
// that it will be tracked and still it will be consider for retry
// as analysis was not found targets for storage movement blocks.
case FEW_BLOCKS_TARGETS_PAIRED:
this.storageMovementsMonitor.add(itemInfo, false);
case NO_BLOCKS_TARGETS_PAIRED:
if (LOG.isDebugEnabled()) {
LOG.debug("Adding trackID " + trackId
+ " back to retry queue as none of the blocks"
+ " found its eligible targets.");
}
this.storageMovementNeeded.add(itemInfo);
break;
case FEW_LOW_REDUNDANCY_BLOCKS:
if (LOG.isDebugEnabled()) {
@ -310,10 +328,10 @@ private void handleException(Throwable t) {
return;
}
private BlocksMovingAnalysisStatus analyseBlocksStorageMovementsAndAssignToDN(
private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
BlockCollection blockCollection) {
BlocksMovingAnalysisStatus status =
BlocksMovingAnalysisStatus.BLOCKS_ALREADY_SATISFIED;
BlocksMovingAnalysis.Status status =
BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED;
byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
BlockStoragePolicy existingStoragePolicy =
blockManager.getStoragePolicy(existingStoragePolicyID);
@ -322,17 +340,18 @@ private BlocksMovingAnalysisStatus analyseBlocksStorageMovementsAndAssignToDN(
// So, should we add back? or leave it to user
LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
+ " this to the next retry iteration", blockCollection.getId());
return BlocksMovingAnalysisStatus.ANALYSIS_SKIPPED_FOR_RETRY;
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
new ArrayList<>());
}
// First datanode will be chosen as the co-ordinator node for storage
// movements. Later this can be optimized if needed.
DatanodeDescriptor coordinatorNode = null;
BlockInfo[] blocks = blockCollection.getBlocks();
if (blocks.length == 0) {
LOG.info("BlockCollectionID: {} file is not having any blocks."
+ " So, skipping the analysis.", blockCollection.getId());
return BlocksMovingAnalysisStatus.BLOCKS_TARGET_PAIRING_SKIPPED;
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
new ArrayList<>());
}
List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
@ -352,7 +371,9 @@ private BlocksMovingAnalysisStatus analyseBlocksStorageMovementsAndAssignToDN(
LOG.warn("The storage policy " + existingStoragePolicy.getName()
+ " is not suitable for Striped EC files. "
+ "So, ignoring to move the blocks");
return BlocksMovingAnalysisStatus.BLOCKS_TARGET_PAIRING_SKIPPED;
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
new ArrayList<>());
}
} else {
expectedStorageTypes = existingStoragePolicy
@ -370,30 +391,35 @@ private BlocksMovingAnalysisStatus analyseBlocksStorageMovementsAndAssignToDN(
new LinkedList<StorageType>(Arrays.asList(storageTypes));
if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
boolean computeStatus = computeBlockMovingInfos(blockMovingInfos,
boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos,
blockInfo, expectedStorageTypes, existing, storages);
if (computeStatus
&& status != BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED
&& !blockManager.hasLowRedundancyBlocks(blockCollection)) {
status = BlocksMovingAnalysisStatus.ALL_BLOCKS_TARGETS_PAIRED;
if (blocksPaired) {
status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
} else {
status = BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED;
// none of the blocks found its eligible targets for satisfying the
// storage policy.
status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
}
} else {
if (blockManager.hasLowRedundancyBlocks(blockCollection)) {
status = BlocksMovingAnalysisStatus.FEW_LOW_REDUNDANCY_BLOCKS;
status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
}
}
}
assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(),
blockMovingInfos, coordinatorNode);
int count = 0;
List<Block> assignedBlockIds = new ArrayList<Block>();
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
count = count + blkMovingInfo.getSources().length;
// Check for at least one block storage movement has been chosen
if (blkMovingInfo.getTarget() != null) {
// assign block storage movement task to the target node
((DatanodeDescriptor) blkMovingInfo.getTarget())
.addBlocksToMoveStorage(blkMovingInfo);
LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
assignedBlockIds.add(blkMovingInfo.getBlock());
blockCount++;
}
}
blockCount = blockCount + count;
return status;
return new BlocksMovingAnalysis(status, assignedBlockIds);
}
/**
@ -468,41 +494,6 @@ private boolean computeBlockMovingInfos(
return foundMatchingTargetNodesForBlock;
}
private void assignBlockMovingInfosToCoordinatorDn(long blockCollectionID,
List<BlockMovingInfo> blockMovingInfos,
DatanodeDescriptor coordinatorNode) {
if (blockMovingInfos.size() < 1) {
// TODO: Major: handle this case. I think we need retry cases to
// be implemented. Idea is, if some files are not getting storage movement
// chances, then we can just retry limited number of times and exit.
return;
}
// For now, first datanode will be chosen as the co-ordinator. Later
// this can be optimized if needed.
coordinatorNode = (DatanodeDescriptor) blockMovingInfos.get(0)
.getSources()[0];
boolean needBlockStorageMovement = false;
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for atleast one block storage movement has been chosen
if (blkMovingInfo.getTargets().length > 0){
needBlockStorageMovement = true;
break;
}
}
if (!needBlockStorageMovement) {
// Simply return as there is no targets selected for scheduling the block
// movement.
return;
}
// 'BlockCollectionId' is used as the tracking ID. All the blocks under this
// blockCollectionID will be added to this datanode.
coordinatorNode.addBlocksToMoveStorage(blockCollectionID, blockMovingInfos);
}
/**
* Find the good target node for each source node for which block storages was
* misplaced.
@ -526,10 +517,6 @@ private boolean findSourceAndTargetToMove(
List<StorageType> expected,
StorageTypeNodeMap locsForExpectedStorageTypes) {
boolean foundMatchingTargetNodesForBlock = true;
List<DatanodeInfo> sourceNodes = new ArrayList<>();
List<StorageType> sourceStorageTypes = new ArrayList<>();
List<DatanodeInfo> targetNodes = new ArrayList<>();
List<StorageType> targetStorageTypes = new ArrayList<>();
List<DatanodeDescriptor> excludeNodes = new ArrayList<>();
// Looping over all the source node locations and choose the target
@ -544,10 +531,15 @@ private boolean findSourceAndTargetToMove(
StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
blockInfo, existingTypeNodePair.dn, expected);
if (chosenTarget != null) {
sourceNodes.add(existingTypeNodePair.dn);
sourceStorageTypes.add(existingTypeNodePair.storageType);
targetNodes.add(chosenTarget.dn);
targetStorageTypes.add(chosenTarget.storageType);
if (blockInfo.isStriped()) {
buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
chosenTarget.storageType, blockMovingInfos);
} else {
buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
chosenTarget.storageType, blockMovingInfos);
}
expected.remove(chosenTarget.storageType);
// TODO: We can increment scheduled block count for this node?
}
@ -563,7 +555,7 @@ private boolean findSourceAndTargetToMove(
StorageTypeNodePair chosenTarget = null;
// Chosen the target storage within same datanode. So just skipping this
// source node.
if (sourceNodes.contains(existingTypeNodePair.dn)) {
if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) {
continue;
}
if (chosenTarget == null && blockManager.getDatanodeManager()
@ -586,10 +578,16 @@ private boolean findSourceAndTargetToMove(
Matcher.ANY_OTHER, locsForExpectedStorageTypes, excludeNodes);
}
if (null != chosenTarget) {
sourceNodes.add(existingTypeNodePair.dn);
sourceStorageTypes.add(existingTypeNodePair.storageType);
targetNodes.add(chosenTarget.dn);
targetStorageTypes.add(chosenTarget.storageType);
if (blockInfo.isStriped()) {
buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
chosenTarget.storageType, blockMovingInfos);
} else {
buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
chosenTarget.storageType, blockMovingInfos);
}
expected.remove(chosenTarget.storageType);
excludeNodes.add(chosenTarget.dn);
// TODO: We can increment scheduled block count for this node?
@ -605,47 +603,33 @@ private boolean findSourceAndTargetToMove(
foundMatchingTargetNodesForBlock = false;
}
blockMovingInfos.addAll(getBlockMovingInfos(blockInfo, sourceNodes,
sourceStorageTypes, targetNodes, targetStorageTypes));
return foundMatchingTargetNodesForBlock;
}
private List<BlockMovingInfo> getBlockMovingInfos(BlockInfo blockInfo,
List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes) {
List<BlockMovingInfo> blkMovingInfos = new ArrayList<>();
// No source-target node pair exists.
if (sourceNodes.size() <= 0) {
return blkMovingInfos;
private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos,
DatanodeDescriptor dn) {
for (BlockMovingInfo blockMovingInfo : blockMovingInfos) {
if (blockMovingInfo.getSource().equals(dn)) {
return true;
}
}
if (blockInfo.isStriped()) {
buildStripedBlockMovingInfos(blockInfo, sourceNodes, sourceStorageTypes,
targetNodes, targetStorageTypes, blkMovingInfos);
} else {
buildContinuousBlockMovingInfos(blockInfo, sourceNodes,
sourceStorageTypes, targetNodes, targetStorageTypes, blkMovingInfos);
}
return blkMovingInfos;
return false;
}
private void buildContinuousBlockMovingInfos(BlockInfo blockInfo,
List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes,
DatanodeInfo sourceNode, StorageType sourceStorageType,
DatanodeInfo targetNode, StorageType targetStorageType,
List<BlockMovingInfo> blkMovingInfos) {
Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(),
blockInfo.getGenerationStamp());
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk,
sourceNodes.toArray(new DatanodeInfo[sourceNodes.size()]),
targetNodes.toArray(new DatanodeInfo[targetNodes.size()]),
sourceStorageTypes.toArray(new StorageType[sourceStorageTypes.size()]),
targetStorageTypes.toArray(new StorageType[targetStorageTypes.size()]));
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
targetNode, sourceStorageType, targetStorageType);
blkMovingInfos.add(blkMovingInfo);
}
private void buildStripedBlockMovingInfos(BlockInfo blockInfo,
List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes,
DatanodeInfo sourceNode, StorageType sourceStorageType,
DatanodeInfo targetNode, StorageType targetStorageType,
List<BlockMovingInfo> blkMovingInfos) {
// For a striped block, it needs to construct internal block at the given
// index of a block group. Here it is iterating over all the block indices
@ -655,30 +639,17 @@ private void buildStripedBlockMovingInfos(BlockInfo blockInfo,
for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) {
if (si.getBlockIndex() >= 0) {
DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor();
DatanodeInfo[] srcNode = new DatanodeInfo[1];
StorageType[] srcStorageType = new StorageType[1];
DatanodeInfo[] targetNode = new DatanodeInfo[1];
StorageType[] targetStorageType = new StorageType[1];
for (int i = 0; i < sourceNodes.size(); i++) {
DatanodeInfo node = sourceNodes.get(i);
if (node.equals(dn)) {
srcNode[0] = node;
srcStorageType[0] = sourceStorageTypes.get(i);
targetNode[0] = targetNodes.get(i);
targetStorageType[0] = targetStorageTypes.get(i);
// construct internal block
long blockId = blockInfo.getBlockId() + si.getBlockIndex();
long numBytes = StripedBlockUtil.getInternalBlockLength(
sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(),
sBlockInfo.getDataBlockNum(), si.getBlockIndex());
Block blk = new Block(blockId, numBytes,
blockInfo.getGenerationStamp());
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, srcNode,
targetNode, srcStorageType, targetStorageType);
blkMovingInfos.add(blkMovingInfo);
break; // found matching source-target nodes
}
if (sourceNode.equals(dn)) {
// construct internal block
long blockId = blockInfo.getBlockId() + si.getBlockIndex();
long numBytes = StripedBlockUtil.getInternalBlockLength(
sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(),
sBlockInfo.getDataBlockNum(), si.getBlockIndex());
Block blk = new Block(blockId, numBytes,
blockInfo.getGenerationStamp());
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
targetNode, sourceStorageType, targetStorageType);
blkMovingInfos.add(blkMovingInfo);
}
}
}
@ -817,18 +788,18 @@ private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) {
}
/**
* Receives the movement results of collection of blocks associated to a
* trackId.
* Receives set of storage movement attempt finished blocks report.
*
* @param blksMovementResults
* movement status of the set of blocks associated to a trackId.
* @param moveAttemptFinishedBlks
* set of storage movement attempt finished blocks.
*/
void handleBlocksStorageMovementResults(
BlocksStorageMovementResult[] blksMovementResults) {
if (blksMovementResults.length <= 0) {
void handleStorageMovementAttemptFinishedBlks(
BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) {
if (moveAttemptFinishedBlks.getBlocks().length <= 0) {
return;
}
storageMovementsMonitor.addResults(blksMovementResults);
storageMovementsMonitor
.addReportedMovedBlocks(moveAttemptFinishedBlks.getBlocks());
}
@VisibleForTesting
@ -906,4 +877,52 @@ public boolean isDir() {
return (startId != trackId);
}
}
/**
* This class contains information of an attempted blocks and its last
* attempted or reported time stamp. This is used by
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
*/
final static class AttemptedItemInfo extends ItemInfo {
private long lastAttemptedOrReportedTime;
private final List<Block> blocks;
/**
* AttemptedItemInfo constructor.
*
* @param rootId
* rootId for trackId
* @param trackId
* trackId for file.
* @param lastAttemptedOrReportedTime
* last attempted or reported time
*/
AttemptedItemInfo(long rootId, long trackId,
long lastAttemptedOrReportedTime,
List<Block> blocks) {
super(rootId, trackId);
this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
this.blocks = blocks;
}
/**
* @return last attempted or reported time stamp.
*/
long getLastAttemptedOrReportedTime() {
return lastAttemptedOrReportedTime;
}
/**
* Update lastAttemptedOrReportedTime, so that the expiration time will be
* postponed to future.
*/
void touchLastReportedTimeStamp() {
this.lastAttemptedOrReportedTime = monotonicNow();
}
List<Block> getBlocks() {
return this.blocks;
}
}
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.protocol;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.fs.StorageType;
@ -29,22 +28,15 @@
* given set of blocks to specified target DataNodes to fulfill the block
* storage policy.
*
* Upon receiving this command, this DataNode coordinates all the block movement
* by passing the details to
* Upon receiving this command, this DataNode pass the array of block movement
* details to
* {@link org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker}
* service. After the block movement this DataNode sends response back to the
* NameNode about the movement status.
*
* The coordinator datanode will use 'trackId' identifier to coordinate the
* block movement of the given set of blocks. TrackId is a unique identifier
* that represents a group of blocks. Namenode will generate this unique value
* and send it to the coordinator datanode along with the
* BlockStorageMovementCommand. Datanode will monitor the completion of the
* block movements that grouped under this trackId and notifies Namenode about
* the completion status.
* service. Later, StoragePolicySatisfyWorker will schedule block movement tasks
* for these blocks and monitors the completion of each task. After the block
* movement attempt is finished(with success or failure) this DataNode will send
* response back to NameNode about the block movement attempt finished details.
*/
public class BlockStorageMovementCommand extends DatanodeCommand {
private final long trackID;
private final String blockPoolId;
private final Collection<BlockMovingInfo> blockMovingTasks;
@ -53,29 +45,16 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
*
* @param action
* protocol specific action
* @param trackID
* unique identifier to monitor the given set of block movements
* @param blockPoolId
* block pool ID
* @param blockMovingInfos
* block to storage info that will be used for movement
*/
public BlockStorageMovementCommand(int action, long trackID,
String blockPoolId, Collection<BlockMovingInfo> blockMovingInfos) {
public BlockStorageMovementCommand(int action, String blockPoolId,
Collection<BlockMovingInfo> blockMovingInfos) {
super(action);
this.trackID = trackID;
this.blockPoolId = blockPoolId;
this.blockMovingTasks = blockMovingInfos;
}
/**
* Returns trackID, which will be used to monitor the block movement assigned
* to this coordinator datanode.
*/
public long getTrackID() {
return trackID;
}
/**
* Returns block pool ID.
*/
@ -95,33 +74,29 @@ public Collection<BlockMovingInfo> getBlockMovingTasks() {
*/
public static class BlockMovingInfo {
private Block blk;
private DatanodeInfo[] sourceNodes;
private DatanodeInfo[] targetNodes;
private StorageType[] sourceStorageTypes;
private StorageType[] targetStorageTypes;
private DatanodeInfo sourceNode;
private DatanodeInfo targetNode;
private StorageType sourceStorageType;
private StorageType targetStorageType;
/**
* Block to storage info constructor.
*
* @param block
* block
* @param sourceDnInfos
* node that can be the sources of a block move
* @param targetDnInfos
* target datanode info
* @param srcStorageTypes
* block info
* @param sourceDnInfo
* node that can be the source of a block move
* @param srcStorageType
* type of source storage media
* @param targetStorageTypes
* type of destin storage media
*/
public BlockMovingInfo(Block block,
DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos,
StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) {
public BlockMovingInfo(Block block, DatanodeInfo sourceDnInfo,
DatanodeInfo targetDnInfo, StorageType srcStorageType,
StorageType targetStorageType) {
this.blk = block;
this.sourceNodes = sourceDnInfos;
this.targetNodes = targetDnInfos;
this.sourceStorageTypes = srcStorageTypes;
this.targetStorageTypes = targetStorageTypes;
this.sourceNode = sourceDnInfo;
this.targetNode = targetDnInfo;
this.sourceStorageType = srcStorageType;
this.targetStorageType = targetStorageType;
}
public void addBlock(Block block) {
@ -129,35 +104,33 @@ public void addBlock(Block block) {
}
public Block getBlock() {
return this.blk;
return blk;
}
public DatanodeInfo[] getSources() {
return sourceNodes;
public DatanodeInfo getSource() {
return sourceNode;
}
public DatanodeInfo[] getTargets() {
return targetNodes;
public DatanodeInfo getTarget() {
return targetNode;
}
public StorageType[] getTargetStorageTypes() {
return targetStorageTypes;
public StorageType getTargetStorageType() {
return targetStorageType;
}
public StorageType[] getSourceStorageTypes() {
return sourceStorageTypes;
public StorageType getSourceStorageType() {
return sourceStorageType;
}
@Override
public String toString() {
return new StringBuilder().append("BlockMovingInfo(\n ")
.append("Moving block: ").append(blk).append(" From: ")
.append(Arrays.asList(sourceNodes)).append(" To: [")
.append(Arrays.asList(targetNodes)).append("\n ")
.append(" sourceStorageTypes: ")
.append(Arrays.toString(sourceStorageTypes))
.append(" targetStorageTypes: ")
.append(Arrays.toString(targetStorageTypes)).append(")").toString();
.append(sourceNode).append(" To: [").append(targetNode).append("\n ")
.append(" sourceStorageType: ").append(sourceStorageType)
.append(" targetStorageType: ").append(targetStorageType).append(")")
.toString();
}
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import java.util.Arrays;
import org.apache.hadoop.hdfs.protocol.Block;
/**
* This class represents, the blocks for which storage movements has done by
* datanodes. The movementFinishedBlocks array contains all the blocks that are
* attempted to do the movement and it could be finished with either success or
* failure.
*/
public class BlocksStorageMoveAttemptFinished {
private final Block[] movementFinishedBlocks;
public BlocksStorageMoveAttemptFinished(Block[] moveAttemptFinishedBlocks) {
this.movementFinishedBlocks = moveAttemptFinishedBlocks;
}
public Block[] getBlocks() {
return movementFinishedBlocks;
}
@Override
public String toString() {
return new StringBuilder().append("BlocksStorageMovementFinished(\n ")
.append(" blockID: ").append(Arrays.toString(movementFinishedBlocks))
.append(")").toString();
}
}

View File

@ -1,74 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
/**
* This class represents, movement status of a set of blocks associated to a
* track Id.
*/
public class BlocksStorageMovementResult {
private final long trackId;
private final Status status;
/**
* SUCCESS - If all the blocks associated to track id has moved successfully
* or maximum possible movements done.
*
* <p>
* FAILURE - If any of its(trackId) blocks movement failed and requires to
* retry these failed blocks movements. Example selected target node is no
* more running or no space. So, retrying by selecting new target node might
* work.
*
* <p>
* IN_PROGRESS - If all or some of the blocks associated to track id are
* still moving.
*/
public enum Status {
SUCCESS, FAILURE, IN_PROGRESS;
}
/**
* BlocksStorageMovementResult constructor.
*
* @param trackId
* tracking identifier
* @param status
* block movement status
*/
public BlocksStorageMovementResult(long trackId, Status status) {
this.trackId = trackId;
this.status = status;
}
public long getTrackId() {
return trackId;
}
public Status getStatus() {
return status;
}
@Override
public String toString() {
return new StringBuilder().append("BlocksStorageMovementResult(\n ")
.append("track id: ").append(trackId).append(" status: ")
.append(status).append(")").toString();
}
}

View File

@ -112,8 +112,7 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration
* @param slowPeers Details of peer DataNodes that were detected as being
* slow to respond to packet writes. Empty report if no
* slow peers were detected by the DataNode.
* @param blksMovementResults array of movement status of a set of blocks
* associated to a trackId.
* @param storageMovFinishedBlks array of movement attempt finished blocks
* @throws IOException on error
*/
@Idempotent
@ -128,7 +127,7 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks,
BlocksStorageMovementResult[] blksMovementResults)
BlocksStorageMoveAttemptFinished storageMovFinishedBlks)
throws IOException;
/**

View File

@ -162,9 +162,8 @@ message BlockECReconstructionCommandProto {
* Block storage movement command
*/
message BlockStorageMovementCommandProto {
required uint64 trackID = 1;
required string blockPoolId = 2;
repeated BlockStorageMovementProto blockStorageMovement = 3;
required string blockPoolId = 1;
repeated BlockMovingInfoProto blockMovingInfo = 2;
}
/**
@ -177,25 +176,20 @@ message DropSPSWorkCommandProto {
/**
* Block storage movement information
*/
message BlockStorageMovementProto {
message BlockMovingInfoProto {
required BlockProto block = 1;
required DatanodeInfosProto sourceDnInfos = 2;
required DatanodeInfosProto targetDnInfos = 3;
required StorageTypesProto sourceStorageTypes = 4;
required StorageTypesProto targetStorageTypes = 5;
required DatanodeInfoProto sourceDnInfo = 2;
required DatanodeInfoProto targetDnInfo = 3;
required StorageTypeProto sourceStorageType = 4;
required StorageTypeProto targetStorageType = 5;
}
/**
* Movement status of the set of blocks associated to a trackId.
* Blocks for which storage movements has been attempted and finished
* with either success or failure.
*/
message BlocksStorageMovementResultProto {
enum Status {
SUCCESS = 1; // block movement succeeded
FAILURE = 2; // block movement failed and needs to retry
IN_PROGRESS = 3; // block movement is still in progress
}
required uint64 trackID = 1;
required Status status = 2;
message BlocksStorageMoveAttemptFinishedProto {
repeated BlockProto blocks = 1;
}
/**
@ -255,7 +249,7 @@ message HeartbeatRequestProto {
optional bool requestFullBlockReportLease = 9 [ default = false ];
repeated SlowPeerReportProto slowPeers = 10;
repeated SlowDiskReportProto slowDisks = 11;
repeated BlocksStorageMovementResultProto blksMovementResults = 12;
optional BlocksStorageMoveAttemptFinishedProto storageMoveAttemptFinishedBlks = 12;
}
/**

View File

@ -4534,24 +4534,35 @@
<property>
<name>dfs.storage.policy.satisfier.recheck.timeout.millis</name>
<value>300000</value>
<value>60000</value>
<description>
Blocks storage movements monitor re-check interval in milliseconds.
This check will verify whether any blocks storage movement results arrived from DN
and also verify if any of file blocks movements not at all reported to DN
since dfs.storage.policy.satisfier.self.retry.timeout.
The default value is 5 * 60 * 1000 (5 mins)
The default value is 1 * 60 * 1000 (1 mins)
</description>
</property>
<property>
<name>dfs.storage.policy.satisfier.self.retry.timeout.millis</name>
<value>1800000</value>
<value>300000</value>
<description>
If any of file related block movements not at all reported by coordinator datanode,
If any of file related block movements not at all reported by datanode,
then after this timeout(in milliseconds), the item will be added back to movement needed list
at namenode which will be retried for block movements.
The default value is 30 * 60 * 1000 (30 mins)
The default value is 5 * 60 * 1000 (5 mins)
</description>
</property>
<property>
<name>dfs.storage.policy.satisfier.low.max-streams.preference</name>
<value>false</value>
<description>
If true, blocks to move tasks will share equal ratio of number of highest-priority
replication streams (dfs.namenode.replication.max-streams) with pending replica and
erasure-coded reconstruction tasks. If false, blocks to move tasks will only use
the delta number of replication streams. The default value is false.
</description>
</property>

View File

@ -106,7 +106,7 @@ Following 2 options will allow users to move the blocks based on new policy set.
### <u>S</u>torage <u>P</u>olicy <u>S</u>atisfier (SPS)
When user changes the storage policy on a file/directory, user can call `HdfsAdmin` API `satisfyStoragePolicy()` to move the blocks as per the new policy set.
The SPS daemon thread runs along with namenode and periodically scans for the storage mismatches between new policy set and the physical blocks placed. This will only track the files/directories for which user invoked satisfyStoragePolicy. If SPS identifies some blocks to be moved for a file, then it will schedule block movement tasks to datanodes. A Coordinator DataNode(C-DN) will track all block movements associated to a file and notify to namenode about movement success/failure. If there are any failures in movement, the SPS will re-attempt by sending new block movement task.
The SPS daemon thread runs along with namenode and periodically scans for the storage mismatches between new policy set and the physical blocks placed. This will only track the files/directories for which user invoked satisfyStoragePolicy. If SPS identifies some blocks to be moved for a file, then it will schedule block movement tasks to datanodes. If there are any failures in movement, the SPS will re-attempt by sending new block movement tasks.
SPS can be enabled and disabled dynamically without restarting the Namenode.
@ -129,10 +129,10 @@ Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HD
enabled and vice versa.
* **dfs.storage.policy.satisfier.recheck.timeout.millis** - A timeout to re-check the processed block storage movement
command results from Co-ordinator Datanode.
command results from Datanodes.
* **dfs.storage.policy.satisfier.self.retry.timeout.millis** - A timeout to retry if no block movement results reported from
Co-ordinator Datanode in this configured timeout.
Datanode in this configured timeout.
### Mover - A New Data Migration Tool

View File

@ -39,7 +39,7 @@
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@ -117,7 +117,8 @@ private static void runTest(final String testCaseName,
cluster.stopDataNode(0);
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
0, null, true, SlowPeerReports.EMPTY_REPORT,
SlowDiskReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]);
SlowDiskReports.EMPTY_REPORT,
new BlocksStorageMoveAttemptFinished(null));
// Check that the missing storage was pruned.
assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));

View File

@ -37,7 +37,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@ -169,7 +169,7 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class),
Mockito.any(BlocksStorageMovementResult[].class))).thenReturn(
Mockito.any(BlocksStorageMoveAttemptFinished.class))).thenReturn(
new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
.nextLong() | 1L));

View File

@ -49,7 +49,7 @@
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -161,7 +161,7 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx)
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class),
Mockito.any(BlocksStorageMovementResult[].class));
Mockito.any(BlocksStorageMoveAttemptFinished.class));
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
datanodeCommands[nnIdx] = new DatanodeCommand[0];
return mock;

View File

@ -93,7 +93,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -234,7 +234,7 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class),
Mockito.any(BlocksStorageMovementResult[].class)))
Mockito.any(BlocksStorageMoveAttemptFinished.class)))
.thenReturn(new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

View File

@ -50,7 +50,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@ -174,7 +174,7 @@ public void testSendLifelineIfHeartbeatBlocked() throws Exception {
anyBoolean(),
any(SlowPeerReports.class),
any(SlowDiskReports.class),
any(BlocksStorageMovementResult[].class));
any(BlocksStorageMoveAttemptFinished.class));
// Intercept lifeline to trigger latch count-down on each call.
doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@ -240,7 +240,7 @@ public void testNoLifelineSentIfHeartbeatsOnTime() throws Exception {
anyBoolean(),
any(SlowPeerReports.class),
any(SlowDiskReports.class),
any(BlocksStorageMovementResult[].class));
any(BlocksStorageMoveAttemptFinished.class));
// While waiting on the latch for the expected number of heartbeat messages,
// poll DataNode tracking information. We expect that the DataNode always

View File

@ -44,7 +44,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@ -224,7 +224,7 @@ public HeartbeatResponse answer(InvocationOnMock invocation)
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class),
Mockito.any(BlocksStorageMovementResult[].class));
Mockito.any(BlocksStorageMoveAttemptFinished.class));
dn = new DataNode(conf, locations, null, null) {
@Override

View File

@ -66,7 +66,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -210,7 +210,7 @@ private static void setHeartbeatResponse(DatanodeCommand[] cmds)
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
anyBoolean(), any(SlowPeerReports.class),
any(SlowDiskReports.class),
(BlocksStorageMovementResult[]) any());
any(BlocksStorageMoveAttemptFinished.class));
} finally {
lock.writeLock().unlock();
}

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -36,8 +36,6 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
@ -180,11 +178,10 @@ public void testMoveWithNoSpaceAvailable() throws Exception {
lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
lb.getStorageTypes()[0], StorageType.ARCHIVE);
blockMovingInfos.add(blockMovingInfo);
INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
worker.processBlockMovingTasks(inode.getId(),
cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
blockMovingInfos);
waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000);
waitForBlockMovementCompletion(worker, 1, 30000);
} finally {
worker.stop();
}
@ -226,50 +223,42 @@ public void testDropSPSWork() throws Exception {
locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE);
blockMovingInfos.add(blockMovingInfo);
}
INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
worker.processBlockMovingTasks(inode.getId(),
cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
blockMovingInfos);
// Wait till results queue build up
waitForBlockMovementResult(worker, inode.getId(), 30000);
waitForBlockMovementResult(worker, 30000);
worker.dropSPSWork();
assertTrue(worker.getBlocksMovementsStatusHandler()
.getBlksMovementResults().size() == 0);
.getMoveAttemptFinishedBlocks().size() == 0);
} finally {
worker.stop();
}
}
private void waitForBlockMovementResult(
final StoragePolicySatisfyWorker worker, final long inodeId, int timeout)
throws Exception {
final StoragePolicySatisfyWorker worker, int timeout) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
List<BlocksStorageMovementResult> completedBlocks = worker
.getBlocksMovementsStatusHandler().getBlksMovementResults();
List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler()
.getMoveAttemptFinishedBlocks();
return completedBlocks.size() > 0;
}
}, 100, timeout);
}
private void waitForBlockMovementCompletion(
final StoragePolicySatisfyWorker worker, final long inodeId,
int expectedFailedItemsCount, int timeout) throws Exception {
final StoragePolicySatisfyWorker worker,
int expectedFinishedItemsCount, int timeout) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
List<BlocksStorageMovementResult> completedBlocks = worker
.getBlocksMovementsStatusHandler().getBlksMovementResults();
int failedCount = 0;
for (BlocksStorageMovementResult blkMovementResult : completedBlocks) {
if (blkMovementResult.getStatus() ==
BlocksStorageMovementResult.Status.FAILURE) {
failedCount++;
}
}
List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler()
.getMoveAttemptFinishedBlocks();
int finishedCount = completedBlocks.size();
LOG.info("Block movement completed count={}, expected={} and actual={}",
completedBlocks.size(), expectedFailedItemsCount, failedCount);
return expectedFailedItemsCount == failedCount;
completedBlocks.size(), expectedFinishedItemsCount, finishedCount);
return expectedFinishedItemsCount == finishedCount;
}
}, 100, timeout);
}
@ -304,8 +293,7 @@ public Boolean get() {
private BlockMovingInfo prepareBlockMovingInfo(Block block,
DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
StorageType targetStorageType) {
return new BlockMovingInfo(block, new DatanodeInfo[] {src},
new DatanodeInfo[] {destin}, new StorageType[] {storageType},
new StorageType[] {targetStorageType});
return new BlockMovingInfo(block, src, destin, storageType,
targetStorageType);
}
}

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@ -112,7 +112,7 @@ public void testStorageReportHasStorageTypeAndState() throws IOException {
Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class),
Mockito.any(BlocksStorageMovementResult[].class));
Mockito.any(BlocksStorageMoveAttemptFinished.class));
StorageReport[] reports = captor.getValue();

View File

@ -56,7 +56,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -958,7 +958,7 @@ void sendHeartbeat() throws IOException {
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
0L, 0L, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
new BlocksStorageMovementResult[0]).getCommands();
new BlocksStorageMoveAttemptFinished(null)).getCommands();
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) {
@ -1009,7 +1009,7 @@ int replicateBlocks() throws IOException {
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
rep, 0L, 0L, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
new BlocksStorageMovementResult[0]).getCommands();
new BlocksStorageMoveAttemptFinished(null)).getCommands();
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

View File

@ -40,7 +40,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@ -132,7 +132,7 @@ public static HeartbeatResponse sendHeartBeat(DatanodeRegistration nodeReg,
BlockManagerTestUtil.getStorageReportsForDatanode(dd),
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
new BlocksStorageMovementResult[0]);
new BlocksStorageMoveAttemptFinished(null));
}
public static boolean setReplication(final FSNamesystem ns,

View File

@ -18,10 +18,17 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.monotonicNow;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -42,9 +49,8 @@ public void setup() throws Exception {
unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
Mockito.mock(Namesystem.class),
Mockito.mock(StoragePolicySatisfier.class), 100);
StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);
selfRetryTimeout, unsatisfiedStorageMovementFiles);
}
@After
@ -76,120 +82,115 @@ private boolean checkItemMovedForRetry(Long item, long retryTimeout)
return isItemFound;
}
/**
* Verify that moved blocks reporting should queued up the block info.
*/
@Test(timeout = 30000)
public void testAddResultWithFailureResult() throws Exception {
public void testAddReportedMoveAttemptFinishedBlocks() throws Exception {
bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234);
bsmAttemptedItems.add(new ItemInfo(0L, item), true);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
assertTrue(checkItemMovedForRetry(item, 200));
List<Block> blocks = new ArrayList<Block>();
blocks.add(new Block(item));
bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks));
Block[] blockArray = new Block[blocks.size()];
blocks.toArray(blockArray);
bsmAttemptedItems.addReportedMovedBlocks(blockArray);
assertEquals("Failed to receive result!", 1,
bsmAttemptedItems.getMovementFinishedBlocksCount());
}
/**
* Verify empty moved blocks reporting queue.
*/
@Test(timeout = 30000)
public void testAddResultWithSucessResult() throws Exception {
bsmAttemptedItems.start(); // start block movement result monitor thread
public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception {
bsmAttemptedItems.start(); // start block movement report monitor thread
Long item = new Long(1234);
bsmAttemptedItems.add(new ItemInfo(0L, item), true);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
assertFalse(checkItemMovedForRetry(item, 200));
}
@Test(timeout = 30000)
public void testNoResultAdded() throws Exception {
bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234);
bsmAttemptedItems.add(new ItemInfo(0L, item), true);
// After self retry timeout, it should be added back for retry
assertTrue("Failed to add to the retry list",
checkItemMovedForRetry(item, 600));
assertEquals("Failed to remove from the attempted list", 0,
List<Block> blocks = new ArrayList<>();
blocks.add(new Block(item));
bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks));
assertEquals("Shouldn't receive result", 0,
bsmAttemptedItems.getMovementFinishedBlocksCount());
assertEquals("Item doesn't exist in the attempted list", 1,
bsmAttemptedItems.getAttemptedItemsCount());
}
/**
* Partial block movement with BlocksStorageMovementResult#SUCCESS. Here,
* first occurrence is #blockStorageMovementResultCheck() and then
* Partial block movement with
* BlockMovementStatus#DN_BLK_STORAGE_MOVEMENT_SUCCESS. Here, first occurrence
* is #blockStorageMovementReportedItemsCheck() and then
* #blocksStorageMovementUnReportedItemsCheck().
*/
@Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried1() throws Exception {
Long item = new Long(1234);
bsmAttemptedItems.add(new ItemInfo(0L, item), false);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
List<Block> blocks = new ArrayList<>();
blocks.add(new Block(item));
blocks.add(new Block(5678L));
Long trackID = 0L;
bsmAttemptedItems
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
Block[] blksMovementReport = new Block[1];
blksMovementReport[0] = new Block(item);
bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
// start block movement result monitor thread
// start block movement report monitor thread
bsmAttemptedItems.start();
assertTrue("Failed to add to the retry list",
checkItemMovedForRetry(item, 5000));
checkItemMovedForRetry(trackID, 5000));
assertEquals("Failed to remove from the attempted list", 0,
bsmAttemptedItems.getAttemptedItemsCount());
}
/**
* Partial block movement with BlocksStorageMovementResult#SUCCESS. Here,
* first occurrence is #blocksStorageMovementUnReportedItemsCheck() and then
* #blockStorageMovementResultCheck().
* Partial block movement. Here, first occurrence is
* #blocksStorageMovementUnReportedItemsCheck() and then
* #blockStorageMovementReportedItemsCheck().
*/
@Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried2() throws Exception {
Long item = new Long(1234);
bsmAttemptedItems.add(new ItemInfo(0L, item), false);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
Long trackID = 0L;
List<Block> blocks = new ArrayList<>();
blocks.add(new Block(item));
bsmAttemptedItems
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
Block[] blksMovementReport = new Block[1];
blksMovementReport[0] = new Block(item);
bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
bsmAttemptedItems.blocksStorageMovementUnReportedItemsCheck();
bsmAttemptedItems.blockStorageMovementResultCheck();
bsmAttemptedItems.blockStorageMovementReportedItemsCheck();
assertTrue("Failed to add to the retry list",
checkItemMovedForRetry(item, 5000));
checkItemMovedForRetry(trackID, 5000));
assertEquals("Failed to remove from the attempted list", 0,
bsmAttemptedItems.getAttemptedItemsCount());
}
/**
* Partial block movement with only BlocksStorageMovementResult#FAILURE
* result and storageMovementAttemptedItems list is empty.
* Partial block movement with only BlocksStorageMoveAttemptFinished report
* and storageMovementAttemptedItems list is empty.
*/
@Test(timeout = 30000)
public void testPartialBlockMovementWithEmptyAttemptedQueue()
throws Exception {
Long item = new Long(1234);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item, BlocksStorageMovementResult.Status.FAILURE)});
bsmAttemptedItems.blockStorageMovementResultCheck();
Long trackID = 0L;
List<Block> blocks = new ArrayList<>();
blocks.add(new Block(item));
bsmAttemptedItems
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
Block[] blksMovementReport = new Block[1];
blksMovementReport[0] = new Block(item);
bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
assertFalse(
"Should not add in queue again if it is not there in"
+ " storageMovementAttemptedItems",
checkItemMovedForRetry(item, 5000));
assertEquals("Failed to remove from the attempted list", 0,
bsmAttemptedItems.getAttemptedItemsCount());
}
/**
* Partial block movement with BlocksStorageMovementResult#FAILURE result and
* storageMovementAttemptedItems.
*/
@Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried4() throws Exception {
Long item = new Long(1234);
bsmAttemptedItems.add(new ItemInfo(0L, item), false);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
bsmAttemptedItems.blockStorageMovementResultCheck();
assertTrue("Failed to add to the retry list",
checkItemMovedForRetry(item, 5000));
assertEquals("Failed to remove from the attempted list", 0,
checkItemMovedForRetry(trackID, 5000));
assertEquals("Failed to remove from the attempted list", 1,
bsmAttemptedItems.getAttemptedItemsCount());
}
}

View File

@ -44,7 +44,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -141,7 +141,7 @@ public void testDeadDatanode() throws Exception {
DatanodeCommand[] cmd =
dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
new BlocksStorageMovementResult[0]).getCommands();
new BlocksStorageMoveAttemptFinished(null)).getCommands();
assertEquals(1, cmd.length);
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
.getAction());

View File

@ -203,11 +203,11 @@ public void testWhenStoragePolicySetToONESSD()
}
/**
* Tests to verify that the block storage movement results will be propagated
* Tests to verify that the block storage movement report will be propagated
* to Namenode via datanode heartbeat.
*/
@Test(timeout = 300000)
public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
public void testBlksStorageMovementAttemptFinishedReport() throws Exception {
try {
createCluster();
// Change policy to ONE_SSD
@ -229,7 +229,7 @@ public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 2, 30000, dfs);
waitForBlocksMovementResult(1, 30000);
waitForBlocksMovementAttemptReport(1, 30000);
} finally {
shutdownCluster();
}
@ -276,7 +276,7 @@ public void testMultipleFilesForSatisfyStoragePolicy() throws Exception {
fileName, StorageType.DISK, 2, 30000, dfs);
}
waitForBlocksMovementResult(files.size(), 30000);
waitForBlocksMovementAttemptReport(files.size(), 30000);
} finally {
shutdownCluster();
}
@ -457,7 +457,7 @@ public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy()
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 2, 30000, dfs);
waitForBlocksMovementResult(1, 30000);
waitForBlocksMovementAttemptReport(1, 30000);
} finally {
shutdownCluster();
}
@ -630,7 +630,7 @@ public void testMoveWithBlockPinning() throws Exception {
// No block movement will be scheduled as there is no target node
// available with the required storage type.
waitForAttemptedItems(1, 30000);
waitForBlocksMovementResult(1, 30000);
waitForBlocksMovementAttemptReport(1, 30000);
DFSTestUtil.waitExpectedStorageType(
file1, StorageType.ARCHIVE, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
@ -691,7 +691,7 @@ public void testWhenOnlyFewSourceNodesHaveMatchingTargetNodes()
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 3, 30000, dfs);
waitForBlocksMovementResult(1, 30000);
waitForBlocksMovementAttemptReport(1, 30000);
} finally {
shutdownCluster();
}
@ -871,7 +871,7 @@ public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace()
Set<DatanodeDescriptor> dns = hdfsCluster.getNamesystem()
.getBlockManager().getDatanodeManager().getDatanodes();
for (DatanodeDescriptor dd : dns) {
assertNull(dd.getBlocksToMoveStorages());
assertNull(dd.getBlocksToMoveStorages(1));
}
// Enable heart beats now
@ -1224,7 +1224,7 @@ public void testMultipleLevelDirectoryForSatisfyStoragePolicy()
/**
* Test SPS for batch processing.
*/
@Test(timeout = 300000)
@Test(timeout = 3000000)
public void testBatchProcessingForSPSDirectory() throws Exception {
try {
StorageType[][] diskTypes = new StorageType[][] {
@ -1252,7 +1252,7 @@ public void testBatchProcessingForSPSDirectory() throws Exception {
DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
30000, dfs);
}
waitForBlocksMovementResult(files.size(), 30000);
waitForBlocksMovementAttemptReport(files.size(), 30000);
String expectedLogMessage = "StorageMovementNeeded queue remaining"
+ " capacity is zero";
assertTrue("Log output does not contain expected log message: "
@ -1268,7 +1268,7 @@ public void testBatchProcessingForSPSDirectory() throws Exception {
* 1. Delete /root when traversing Q
* 2. U, R, S should not be in queued.
*/
@Test
@Test(timeout = 300000)
public void testTraverseWhenParentDeleted() throws Exception {
StorageType[][] diskTypes = new StorageType[][] {
{StorageType.DISK, StorageType.ARCHIVE},
@ -1330,7 +1330,7 @@ public void testTraverseWhenParentDeleted() throws Exception {
* 1. Delete L when traversing Q
* 2. E, M, U, R, S should not be in queued.
*/
@Test
@Test(timeout = 300000)
public void testTraverseWhenRootParentDeleted() throws Exception {
StorageType[][] diskTypes = new StorageType[][] {
{StorageType.DISK, StorageType.ARCHIVE},
@ -1387,6 +1387,82 @@ public void testTraverseWhenRootParentDeleted() throws Exception {
dfs.delete(new Path("/root"), true);
}
/**
* Test storage move blocks while under replication block tasks exists in the
* system. So, both will share the max transfer streams.
*
* 1. Create cluster with 3 datanode.
* 2. Create 20 files with 2 replica.
* 3. Start 2 more DNs with DISK & SSD types
* 4. SetReplication factor for the 1st 10 files to 4 to trigger replica task
* 5. Set policy to SSD to the 2nd set of files from 11-20
* 6. Call SPS for 11-20 files to trigger move block tasks to new DNs
* 7. Wait for the under replica and SPS tasks completion
*/
@Test(timeout = 300000)
public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception {
try {
config.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 3);
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
config.set(DFSConfigKeys
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
"3000");
config.setBoolean(DFSConfigKeys
.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
true);
StorageType[][] storagetypes = new StorageType[][] {
{StorageType.ARCHIVE, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.DISK}};
hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2)
.storageTypes(storagetypes).build();
hdfsCluster.waitActive();
dfs = hdfsCluster.getFileSystem();
// Below files will be used for pending replication block tasks.
for (int i=1; i<=20; i++){
Path filePath = new Path("/file" + i);
DFSTestUtil.createFile(dfs, filePath, DEFAULT_BLOCK_SIZE * 5, (short) 2,
0);
}
StorageType[][] newtypes =
new StorageType[][]{{StorageType.DISK, StorageType.SSD},
{StorageType.DISK, StorageType.SSD}};
startAdditionalDNs(config, 2, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
// increase replication factor to 4 for the first 10 files and thus
// initiate replica tasks
for (int i=1; i<=10; i++){
Path filePath = new Path("/file" + i);
dfs.setReplication(filePath, (short) 4);
}
// invoke SPS for 11-20 files
for (int i = 11; i <= 20; i++) {
Path filePath = new Path("/file" + i);
dfs.setStoragePolicy(filePath, "ALL_SSD");
dfs.satisfyStoragePolicy(filePath);
}
for (int i = 1; i <= 10; i++) {
Path filePath = new Path("/file" + i);
DFSTestUtil.waitExpectedStorageType(filePath.toString(),
StorageType.DISK, 4, 30000, hdfsCluster.getFileSystem());
}
for (int i = 11; i <= 20; i++) {
Path filePath = new Path("/file" + i);
DFSTestUtil.waitExpectedStorageType(filePath.toString(),
StorageType.SSD, 2, 30000, hdfsCluster.getFileSystem());
}
} finally {
shutdownCluster();
}
}
private static void createDirectoryTree(DistributedFileSystem dfs)
throws Exception {
// tree structure
@ -1514,18 +1590,19 @@ public Boolean get() {
}, 100, timeout);
}
private void waitForBlocksMovementResult(long expectedBlkMovResultsCount,
int timeout) throws TimeoutException, InterruptedException {
private void waitForBlocksMovementAttemptReport(
long expectedMovementFinishedBlocksCount, int timeout)
throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LOG.info("expectedResultsCount={} actualResultsCount={}",
expectedBlkMovResultsCount,
sps.getAttemptedItemsMonitor().resultsCount());
return sps.getAttemptedItemsMonitor()
.resultsCount() == expectedBlkMovResultsCount;
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
expectedMovementFinishedBlocksCount,
sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
>= expectedMovementFinishedBlocksCount;
}
}, 100, timeout);
}

View File

@ -180,7 +180,7 @@ public void testMoverWithFullStripe() throws Exception {
LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
cluster.triggerHeartbeats();
waitForBlocksMovementResult(cluster, 1, 60000);
waitForBlocksMovementAttemptReport(cluster, 9, 60000);
// verify storage types and locations
waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 9,
9, 60000);
@ -290,7 +290,7 @@ public void testWhenOnlyFewTargetNodesAreAvailableToSatisfyStoragePolicy()
LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
cluster.triggerHeartbeats();
waitForBlocksMovementResult(cluster, 1, 60000);
waitForBlocksMovementAttemptReport(cluster, 5, 60000);
waitForAttemptedItems(cluster, 1, 30000);
// verify storage types and locations.
waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 5,
@ -556,10 +556,10 @@ public Boolean get() {
}, 100, timeout);
}
// Check whether the block movement result has been arrived at the
// Check whether the block movement attempt report has been arrived at the
// Namenode(SPS).
private void waitForBlocksMovementResult(MiniDFSCluster cluster,
long expectedBlkMovResultsCount, int timeout)
private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
long expectedMovementFinishedBlocksCount, int timeout)
throws TimeoutException, InterruptedException {
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
@ -568,11 +568,11 @@ private void waitForBlocksMovementResult(MiniDFSCluster cluster,
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LOG.info("expectedResultsCount={} actualResultsCount={}",
expectedBlkMovResultsCount,
sps.getAttemptedItemsMonitor().resultsCount());
return sps.getAttemptedItemsMonitor()
.resultsCount() == expectedBlkMovResultsCount;
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
expectedMovementFinishedBlocksCount,
sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
>= expectedMovementFinishedBlocksCount;
}
}, 100, timeout);
}