HDFS-12106: [SPS]: Improve storage policy satisfier configurations. Contributed by Surendra Singh Lilhore.
This commit is contained in:
parent
9b83f94f35
commit
c561cb316e
@ -149,6 +149,12 @@ public enum StoragePolicySatisfyPathStatus {
|
|||||||
*/
|
*/
|
||||||
SUCCESS,
|
SUCCESS,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Few blocks failed to move and the path is still not
|
||||||
|
* fully satisfied the storage policy.
|
||||||
|
*/
|
||||||
|
FAILURE,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Status not available.
|
* Status not available.
|
||||||
*/
|
*/
|
||||||
|
@ -3409,6 +3409,8 @@ public static StoragePolicySatisfyPathStatus convert(
|
|||||||
return StoragePolicySatisfyPathStatus.IN_PROGRESS;
|
return StoragePolicySatisfyPathStatus.IN_PROGRESS;
|
||||||
case SUCCESS:
|
case SUCCESS:
|
||||||
return StoragePolicySatisfyPathStatus.SUCCESS;
|
return StoragePolicySatisfyPathStatus.SUCCESS;
|
||||||
|
case FAILURE:
|
||||||
|
return StoragePolicySatisfyPathStatus.FAILURE;
|
||||||
case NOT_AVAILABLE:
|
case NOT_AVAILABLE:
|
||||||
return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
|
return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
|
||||||
default:
|
default:
|
||||||
@ -3425,6 +3427,8 @@ public static HdfsConstants.StoragePolicySatisfyPathStatus convert(
|
|||||||
return HdfsConstants.StoragePolicySatisfyPathStatus.IN_PROGRESS;
|
return HdfsConstants.StoragePolicySatisfyPathStatus.IN_PROGRESS;
|
||||||
case SUCCESS:
|
case SUCCESS:
|
||||||
return HdfsConstants.StoragePolicySatisfyPathStatus.SUCCESS;
|
return HdfsConstants.StoragePolicySatisfyPathStatus.SUCCESS;
|
||||||
|
case FAILURE:
|
||||||
|
return HdfsConstants.StoragePolicySatisfyPathStatus.FAILURE;
|
||||||
case NOT_AVAILABLE:
|
case NOT_AVAILABLE:
|
||||||
return HdfsConstants.StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
|
return HdfsConstants.StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
|
||||||
default:
|
default:
|
||||||
|
@ -854,7 +854,8 @@ message CheckStoragePolicySatisfyPathStatusResponseProto {
|
|||||||
PENDING = 0;
|
PENDING = 0;
|
||||||
IN_PROGRESS = 1;
|
IN_PROGRESS = 1;
|
||||||
SUCCESS = 2;
|
SUCCESS = 2;
|
||||||
NOT_AVAILABLE = 3;
|
FAILURE = 3;
|
||||||
|
NOT_AVAILABLE = 4;
|
||||||
}
|
}
|
||||||
required StoragePolicySatisfyPathStatus status = 1;
|
required StoragePolicySatisfyPathStatus status = 1;
|
||||||
}
|
}
|
||||||
|
@ -634,10 +634,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
"dfs.storage.policy.satisfier.self.retry.timeout.millis";
|
"dfs.storage.policy.satisfier.self.retry.timeout.millis";
|
||||||
public static final int DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT =
|
public static final int DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT =
|
||||||
5 * 60 * 1000;
|
5 * 60 * 1000;
|
||||||
public static final String DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY =
|
public static final String DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY =
|
||||||
|
"dfs.storage.policy.satisfier.retry.max.attempts";
|
||||||
|
public static final int DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT =
|
||||||
|
3;
|
||||||
|
public static final String DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY =
|
||||||
"dfs.storage.policy.satisfier.low.max-streams.preference";
|
"dfs.storage.policy.satisfier.low.max-streams.preference";
|
||||||
public static final boolean DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT =
|
public static final boolean DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_DEFAULT =
|
||||||
false;
|
true;
|
||||||
|
|
||||||
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
|
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
|
||||||
public static final int DFS_DATANODE_DEFAULT_PORT = 9866;
|
public static final int DFS_DATANODE_DEFAULT_PORT = 9866;
|
||||||
|
@ -208,7 +208,7 @@ public class DatanodeManager {
|
|||||||
*/
|
*/
|
||||||
private final long timeBetweenResendingCachingDirectivesMs;
|
private final long timeBetweenResendingCachingDirectivesMs;
|
||||||
|
|
||||||
private final boolean blocksToMoveShareEqualRatio;
|
private final boolean blocksToMoveLowPriority;
|
||||||
|
|
||||||
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
|
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
|
||||||
final Configuration conf) throws IOException {
|
final Configuration conf) throws IOException {
|
||||||
@ -337,9 +337,9 @@ public class DatanodeManager {
|
|||||||
|
|
||||||
// SPS configuration to decide blocks to move can share equal ratio of
|
// SPS configuration to decide blocks to move can share equal ratio of
|
||||||
// maxtransfers with pending replica and erasure-coded reconstruction tasks
|
// maxtransfers with pending replica and erasure-coded reconstruction tasks
|
||||||
blocksToMoveShareEqualRatio = conf.getBoolean(
|
blocksToMoveLowPriority = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
|
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY,
|
||||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT);
|
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long getStaleIntervalFromConf(Configuration conf,
|
private static long getStaleIntervalFromConf(Configuration conf,
|
||||||
@ -1699,11 +1699,11 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
|
|||||||
int numReplicationTasks = 0;
|
int numReplicationTasks = 0;
|
||||||
int numECTasks = 0;
|
int numECTasks = 0;
|
||||||
int numBlocksToMoveTasks = 0;
|
int numBlocksToMoveTasks = 0;
|
||||||
// Check blocksToMoveShareEqualRatio configuration is true/false. If true,
|
// Check blocksToMoveLowPriority configuration is true/false. If false,
|
||||||
// then equally sharing the max transfer. Otherwise gives high priority to
|
// then equally sharing the max transfer. Otherwise gives high priority to
|
||||||
// the pending_replica/erasure-coded tasks and only the delta streams will
|
// the pending_replica/erasure-coded tasks and only the delta streams will
|
||||||
// be used for blocks to move tasks.
|
// be used for blocks to move tasks.
|
||||||
if (blocksToMoveShareEqualRatio) {
|
if (!blocksToMoveLowPriority) {
|
||||||
// add blocksToMove count to total blocks so that will get equal share
|
// add blocksToMove count to total blocks so that will get equal share
|
||||||
totalBlocks = totalBlocks + totalBlocksToMove;
|
totalBlocks = totalBlocks + totalBlocksToMove;
|
||||||
numReplicationTasks = (int) Math
|
numReplicationTasks = (int) Math
|
||||||
|
@ -196,7 +196,8 @@ public void processBlockMovingTasks(final String blockPoolID,
|
|||||||
* This class encapsulates the process of moving the block replica to the
|
* This class encapsulates the process of moving the block replica to the
|
||||||
* given target and wait for the response.
|
* given target and wait for the response.
|
||||||
*/
|
*/
|
||||||
private class BlockMovingTask implements Callable<BlockMovementAttemptFinished> {
|
private class BlockMovingTask implements
|
||||||
|
Callable<BlockMovementAttemptFinished> {
|
||||||
private final String blockPoolID;
|
private final String blockPoolID;
|
||||||
private final Block block;
|
private final Block block;
|
||||||
private final DatanodeInfo source;
|
private final DatanodeInfo source;
|
||||||
|
@ -183,7 +183,7 @@ void blocksStorageMovementUnReportedItemsCheck() {
|
|||||||
Long blockCollectionID = itemInfo.getTrackId();
|
Long blockCollectionID = itemInfo.getTrackId();
|
||||||
synchronized (movementFinishedBlocks) {
|
synchronized (movementFinishedBlocks) {
|
||||||
ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
|
ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
|
||||||
blockCollectionID);
|
blockCollectionID, itemInfo.getRetryCount() + 1);
|
||||||
blockStorageMovementNeeded.add(candidate);
|
blockStorageMovementNeeded.add(candidate);
|
||||||
iter.remove();
|
iter.remove();
|
||||||
LOG.info("TrackID: {} becomes timed out and moved to needed "
|
LOG.info("TrackID: {} becomes timed out and moved to needed "
|
||||||
@ -211,9 +211,9 @@ void blockStorageMovementReportedItemsCheck() throws IOException {
|
|||||||
// TODO: try add this at front of the Queue, so that this element
|
// TODO: try add this at front of the Queue, so that this element
|
||||||
// gets the chance first and can be cleaned from queue quickly as
|
// gets the chance first and can be cleaned from queue quickly as
|
||||||
// all movements already done.
|
// all movements already done.
|
||||||
blockStorageMovementNeeded
|
blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo
|
||||||
.add(new ItemInfo(attemptedItemInfo.getStartId(),
|
.getStartId(), attemptedItemInfo.getTrackId(),
|
||||||
attemptedItemInfo.getTrackId()));
|
attemptedItemInfo.getRetryCount() + 1));
|
||||||
iterator.remove();
|
iterator.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -178,8 +178,8 @@ public synchronized void clearAll() {
|
|||||||
* Decrease the pending child count for directory once one file blocks moved
|
* Decrease the pending child count for directory once one file blocks moved
|
||||||
* successfully. Remove the SPS xAttr if pending child count is zero.
|
* successfully. Remove the SPS xAttr if pending child count is zero.
|
||||||
*/
|
*/
|
||||||
public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
|
public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
|
||||||
throws IOException {
|
boolean isSuccess) throws IOException {
|
||||||
if (trackInfo.isDir()) {
|
if (trackInfo.isDir()) {
|
||||||
// If track is part of some start inode then reduce the pending
|
// If track is part of some start inode then reduce the pending
|
||||||
// directory work count.
|
// directory work count.
|
||||||
@ -188,7 +188,7 @@ public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
|
|||||||
if (inode == null) {
|
if (inode == null) {
|
||||||
// directory deleted just remove it.
|
// directory deleted just remove it.
|
||||||
this.pendingWorkForDirectory.remove(startId);
|
this.pendingWorkForDirectory.remove(startId);
|
||||||
markSuccess(startId);
|
updateStatus(startId, isSuccess);
|
||||||
} else {
|
} else {
|
||||||
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
|
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
|
||||||
if (pendingWork != null) {
|
if (pendingWork != null) {
|
||||||
@ -196,8 +196,10 @@ public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
|
|||||||
if (pendingWork.isDirWorkDone()) {
|
if (pendingWork.isDirWorkDone()) {
|
||||||
namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
|
namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
|
||||||
pendingWorkForDirectory.remove(startId);
|
pendingWorkForDirectory.remove(startId);
|
||||||
markSuccess(startId);
|
pendingWork.setFailure(!isSuccess);
|
||||||
|
updateStatus(startId, pendingWork.isPolicySatisfied());
|
||||||
}
|
}
|
||||||
|
pendingWork.setFailure(isSuccess);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -205,7 +207,7 @@ public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
|
|||||||
// storageMovementAttemptedItems or file policy satisfied.
|
// storageMovementAttemptedItems or file policy satisfied.
|
||||||
namesystem.removeXattr(trackInfo.getTrackId(),
|
namesystem.removeXattr(trackInfo.getTrackId(),
|
||||||
XATTR_SATISFY_STORAGE_POLICY);
|
XATTR_SATISFY_STORAGE_POLICY);
|
||||||
markSuccess(trackInfo.getStartId());
|
updateStatus(trackInfo.getStartId(), isSuccess);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -224,14 +226,19 @@ public synchronized void clearQueue(long trackId) {
|
|||||||
/**
|
/**
|
||||||
* Mark inode status as SUCCESS in map.
|
* Mark inode status as SUCCESS in map.
|
||||||
*/
|
*/
|
||||||
private void markSuccess(long startId){
|
private void updateStatus(long startId, boolean isSuccess){
|
||||||
StoragePolicySatisfyPathStatusInfo spsStatusInfo =
|
StoragePolicySatisfyPathStatusInfo spsStatusInfo =
|
||||||
spsStatus.get(startId);
|
spsStatus.get(startId);
|
||||||
if (spsStatusInfo == null) {
|
if (spsStatusInfo == null) {
|
||||||
spsStatusInfo = new StoragePolicySatisfyPathStatusInfo();
|
spsStatusInfo = new StoragePolicySatisfyPathStatusInfo();
|
||||||
spsStatus.put(startId, spsStatusInfo);
|
spsStatus.put(startId, spsStatusInfo);
|
||||||
}
|
}
|
||||||
spsStatusInfo.setSuccess();
|
|
||||||
|
if (isSuccess) {
|
||||||
|
spsStatusInfo.setSuccess();
|
||||||
|
} else {
|
||||||
|
spsStatusInfo.setFailure();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -325,7 +332,7 @@ public void run() {
|
|||||||
namesystem.removeXattr(startInode.getId(),
|
namesystem.removeXattr(startInode.getId(),
|
||||||
XATTR_SATISFY_STORAGE_POLICY);
|
XATTR_SATISFY_STORAGE_POLICY);
|
||||||
pendingWorkForDirectory.remove(startInode.getId());
|
pendingWorkForDirectory.remove(startInode.getId());
|
||||||
markSuccess(startInode.getId());
|
updateStatus(startInode.getId(), true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -431,6 +438,7 @@ public static class DirPendingWorkInfo {
|
|||||||
|
|
||||||
private int pendingWorkCount = 0;
|
private int pendingWorkCount = 0;
|
||||||
private boolean fullyScanned = false;
|
private boolean fullyScanned = false;
|
||||||
|
private boolean success = true;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Increment the pending work count for directory.
|
* Increment the pending work count for directory.
|
||||||
@ -461,6 +469,20 @@ public synchronized boolean isDirWorkDone() {
|
|||||||
public synchronized void markScanCompleted() {
|
public synchronized void markScanCompleted() {
|
||||||
this.fullyScanned = true;
|
this.fullyScanned = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return true if all the files block movement is success, otherwise false.
|
||||||
|
*/
|
||||||
|
public boolean isPolicySatisfied() {
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set directory SPS status failed.
|
||||||
|
*/
|
||||||
|
public void setFailure(boolean failure) {
|
||||||
|
this.success = this.success || failure;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void init() {
|
public void init() {
|
||||||
@ -510,6 +532,11 @@ private void setSuccess() {
|
|||||||
this.lastStatusUpdateTime = Time.monotonicNow();
|
this.lastStatusUpdateTime = Time.monotonicNow();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setFailure() {
|
||||||
|
this.status = StoragePolicySatisfyPathStatus.FAILURE;
|
||||||
|
this.lastStatusUpdateTime = Time.monotonicNow();
|
||||||
|
}
|
||||||
|
|
||||||
private StoragePolicySatisfyPathStatus getStatus() {
|
private StoragePolicySatisfyPathStatus getStatus() {
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
@ -518,7 +545,8 @@ private StoragePolicySatisfyPathStatus getStatus() {
|
|||||||
* Return true if SUCCESS status cached more then 5 min.
|
* Return true if SUCCESS status cached more then 5 min.
|
||||||
*/
|
*/
|
||||||
private boolean canRemove() {
|
private boolean canRemove() {
|
||||||
return StoragePolicySatisfyPathStatus.SUCCESS == status
|
return (StoragePolicySatisfyPathStatus.SUCCESS == status
|
||||||
|
|| StoragePolicySatisfyPathStatus.FAILURE == status)
|
||||||
&& (Time.monotonicNow()
|
&& (Time.monotonicNow()
|
||||||
- lastStatusUpdateTime) > statusClearanceElapsedTimeMs;
|
- lastStatusUpdateTime) > statusClearanceElapsedTimeMs;
|
||||||
}
|
}
|
||||||
|
@ -7869,6 +7869,9 @@ public void removeXattr(long id, String xattrName) throws IOException {
|
|||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
final INode inode = dir.getInode(id);
|
final INode inode = dir.getInode(id);
|
||||||
|
if (inode == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
final XAttrFeature xaf = inode.getXAttrFeature();
|
final XAttrFeature xaf = inode.getXAttrFeature();
|
||||||
if (xaf == null) {
|
if (xaf == null) {
|
||||||
return;
|
return;
|
||||||
|
@ -83,6 +83,7 @@ public class StoragePolicySatisfier implements Runnable {
|
|||||||
private volatile boolean isRunning = false;
|
private volatile boolean isRunning = false;
|
||||||
private int spsWorkMultiplier;
|
private int spsWorkMultiplier;
|
||||||
private long blockCount = 0L;
|
private long blockCount = 0L;
|
||||||
|
private int blockMovementMaxRetry;
|
||||||
/**
|
/**
|
||||||
* Represents the collective analysis status for all blocks.
|
* Represents the collective analysis status for all blocks.
|
||||||
*/
|
*/
|
||||||
@ -137,6 +138,9 @@ public StoragePolicySatisfier(final Namesystem namesystem,
|
|||||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
|
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
|
||||||
storageMovementNeeded);
|
storageMovementNeeded);
|
||||||
this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
|
this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
|
||||||
|
this.blockMovementMaxRetry = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
|
||||||
|
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -243,6 +247,13 @@ public void run() {
|
|||||||
if (!namesystem.isInSafeMode()) {
|
if (!namesystem.isInSafeMode()) {
|
||||||
ItemInfo itemInfo = storageMovementNeeded.get();
|
ItemInfo itemInfo = storageMovementNeeded.get();
|
||||||
if (itemInfo != null) {
|
if (itemInfo != null) {
|
||||||
|
if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
|
||||||
|
LOG.info("Failed to satisfy the policy after "
|
||||||
|
+ blockMovementMaxRetry + " retries. Removing inode "
|
||||||
|
+ itemInfo.getTrackId() + " from the queue");
|
||||||
|
storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
long trackId = itemInfo.getTrackId();
|
long trackId = itemInfo.getTrackId();
|
||||||
BlockCollection blockCollection;
|
BlockCollection blockCollection;
|
||||||
BlocksMovingAnalysis status = null;
|
BlocksMovingAnalysis status = null;
|
||||||
@ -253,7 +264,7 @@ public void run() {
|
|||||||
if (blockCollection == null) {
|
if (blockCollection == null) {
|
||||||
// File doesn't exists (maybe got deleted), remove trackId from
|
// File doesn't exists (maybe got deleted), remove trackId from
|
||||||
// the queue
|
// the queue
|
||||||
storageMovementNeeded.removeItemTrackInfo(itemInfo);
|
storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
|
||||||
} else {
|
} else {
|
||||||
status =
|
status =
|
||||||
analyseBlocksStorageMovementsAndAssignToDN(
|
analyseBlocksStorageMovementsAndAssignToDN(
|
||||||
@ -269,9 +280,9 @@ public void run() {
|
|||||||
// Just add to monitor, so it will be tracked for report and
|
// Just add to monitor, so it will be tracked for report and
|
||||||
// be removed on storage movement attempt finished report.
|
// be removed on storage movement attempt finished report.
|
||||||
case BLOCKS_TARGETS_PAIRED:
|
case BLOCKS_TARGETS_PAIRED:
|
||||||
this.storageMovementsMonitor.add(new AttemptedItemInfo(
|
this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo
|
||||||
itemInfo.getStartId(), itemInfo.getTrackId(),
|
.getStartId(), itemInfo.getTrackId(), monotonicNow(),
|
||||||
monotonicNow(), status.assignedBlocks));
|
status.assignedBlocks, itemInfo.getRetryCount()));
|
||||||
break;
|
break;
|
||||||
case NO_BLOCKS_TARGETS_PAIRED:
|
case NO_BLOCKS_TARGETS_PAIRED:
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
@ -279,6 +290,7 @@ public void run() {
|
|||||||
+ " back to retry queue as none of the blocks"
|
+ " back to retry queue as none of the blocks"
|
||||||
+ " found its eligible targets.");
|
+ " found its eligible targets.");
|
||||||
}
|
}
|
||||||
|
itemInfo.retryCount++;
|
||||||
this.storageMovementNeeded.add(itemInfo);
|
this.storageMovementNeeded.add(itemInfo);
|
||||||
break;
|
break;
|
||||||
case FEW_LOW_REDUNDANCY_BLOCKS:
|
case FEW_LOW_REDUNDANCY_BLOCKS:
|
||||||
@ -295,7 +307,7 @@ public void run() {
|
|||||||
default:
|
default:
|
||||||
LOG.info("Block analysis skipped or blocks already satisfied"
|
LOG.info("Block analysis skipped or blocks already satisfied"
|
||||||
+ " with storages. So, Cleaning up the Xattrs.");
|
+ " with storages. So, Cleaning up the Xattrs.");
|
||||||
storageMovementNeeded.removeItemTrackInfo(itemInfo);
|
storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -861,10 +873,19 @@ public void clearQueue(long trackId) {
|
|||||||
public static class ItemInfo {
|
public static class ItemInfo {
|
||||||
private long startId;
|
private long startId;
|
||||||
private long trackId;
|
private long trackId;
|
||||||
|
private int retryCount;
|
||||||
|
|
||||||
public ItemInfo(long startId, long trackId) {
|
public ItemInfo(long startId, long trackId) {
|
||||||
this.startId = startId;
|
this.startId = startId;
|
||||||
this.trackId = trackId;
|
this.trackId = trackId;
|
||||||
|
//set 0 when item is getting added first time in queue.
|
||||||
|
this.retryCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ItemInfo(long startId, long trackId, int retryCount) {
|
||||||
|
this.startId = startId;
|
||||||
|
this.trackId = trackId;
|
||||||
|
this.retryCount = retryCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -887,6 +908,13 @@ public long getTrackId() {
|
|||||||
public boolean isDir() {
|
public boolean isDir() {
|
||||||
return (startId != trackId);
|
return (startId != trackId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the attempted retry count of the block for satisfy the policy.
|
||||||
|
*/
|
||||||
|
public int getRetryCount() {
|
||||||
|
return retryCount;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -910,8 +938,8 @@ final static class AttemptedItemInfo extends ItemInfo {
|
|||||||
*/
|
*/
|
||||||
AttemptedItemInfo(long rootId, long trackId,
|
AttemptedItemInfo(long rootId, long trackId,
|
||||||
long lastAttemptedOrReportedTime,
|
long lastAttemptedOrReportedTime,
|
||||||
List<Block> blocks) {
|
List<Block> blocks, int retryCount) {
|
||||||
super(rootId, trackId);
|
super(rootId, trackId, retryCount);
|
||||||
this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
|
this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
|
||||||
this.blocks = blocks;
|
this.blocks = blocks;
|
||||||
}
|
}
|
||||||
|
@ -272,8 +272,11 @@ public String getLongUsage() {
|
|||||||
+ " the policy in given path. This will print the current"
|
+ " the policy in given path. This will print the current"
|
||||||
+ "status of the path in each 10 sec and status are:\n"
|
+ "status of the path in each 10 sec and status are:\n"
|
||||||
+ "PENDING : Path is in queue and not processed for satisfying"
|
+ "PENDING : Path is in queue and not processed for satisfying"
|
||||||
+ " the policy.\nIN_PROGRESS : Satisfying the storage policy for"
|
+ " the policy.\n"
|
||||||
+ " path.\nSUCCESS : Storage policy satisfied for the path.\n"
|
+ "IN_PROGRESS : Satisfying the storage policy for"
|
||||||
|
+ " path.\n"
|
||||||
|
+ "SUCCESS : Storage policy satisfied for the path.\n"
|
||||||
|
+ "FAILURE : Few blocks failed to move.\n"
|
||||||
+ "NOT_AVAILABLE : Status not available.");
|
+ "NOT_AVAILABLE : Status not available.");
|
||||||
return getShortUsage() + "\n" +
|
return getShortUsage() + "\n" +
|
||||||
"Schedule blocks to move based on file/directory policy.\n\n" +
|
"Schedule blocks to move based on file/directory policy.\n\n" +
|
||||||
@ -305,18 +308,30 @@ public int run(Configuration conf, List<String> args) throws IOException {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void waitForSatisfyPolicy(DistributedFileSystem dfs, String path)
|
private void waitForSatisfyPolicy(DistributedFileSystem dfs, String path)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
System.out.println("Waiting for satisfy the policy ...");
|
System.out.println("Waiting for satisfy the policy ...");
|
||||||
while (true) {
|
boolean running = true;
|
||||||
|
while (running) {
|
||||||
StoragePolicySatisfyPathStatus status = dfs.getClient()
|
StoragePolicySatisfyPathStatus status = dfs.getClient()
|
||||||
.checkStoragePolicySatisfyPathStatus(path);
|
.checkStoragePolicySatisfyPathStatus(path);
|
||||||
if (StoragePolicySatisfyPathStatus.SUCCESS.equals(status)) {
|
switch (status) {
|
||||||
|
case SUCCESS:
|
||||||
|
case FAILURE:
|
||||||
|
case NOT_AVAILABLE:
|
||||||
System.out.println(status);
|
System.out.println(status);
|
||||||
|
running = false;
|
||||||
|
break;
|
||||||
|
case PENDING:
|
||||||
|
case IN_PROGRESS:
|
||||||
|
System.out.println(status);
|
||||||
|
default:
|
||||||
|
System.err.println("Unexpected storage policy satisfyer status,"
|
||||||
|
+ " Exiting");
|
||||||
|
running = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
System.out.println(status);
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(10000);
|
Thread.sleep(10000);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -4557,12 +4557,21 @@
|
|||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.storage.policy.satisfier.low.max-streams.preference</name>
|
<name>dfs.storage.policy.satisfier.low.max-streams.preference</name>
|
||||||
<value>false</value>
|
<value>true</value>
|
||||||
<description>
|
<description>
|
||||||
If true, blocks to move tasks will share equal ratio of number of highest-priority
|
If false, blocks to move tasks will share equal ratio of number of highest-priority
|
||||||
replication streams (dfs.namenode.replication.max-streams) with pending replica and
|
replication streams (dfs.namenode.replication.max-streams) with pending replica and
|
||||||
erasure-coded reconstruction tasks. If false, blocks to move tasks will only use
|
erasure-coded reconstruction tasks. If true, blocks to move tasks will only use
|
||||||
the delta number of replication streams. The default value is false.
|
the delta number of replication streams. The default value is true.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.storage.policy.satisfier.retry.max.attempts</name>
|
||||||
|
<value>3</value>
|
||||||
|
<description>
|
||||||
|
Max retry to satisfy the block storage policy. After this retry block will be removed
|
||||||
|
from the movement needed queue.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
@ -224,7 +224,7 @@ Schedule blocks to move based on file's/directory's current storage policy.
|
|||||||
| | |
|
| | |
|
||||||
|:---- |:---- |
|
|:---- |:---- |
|
||||||
| `-path <path>` | The path referring to either a directory or a file. |
|
| `-path <path>` | The path referring to either a directory or a file. |
|
||||||
| `-w` | It requests that the command wait till all the files satisfy the policy in given path. This will print the current status of the path in each 10 sec and status are:<br/>PENDING - Path is in queue and not processed for satisfying the policy.<br/>IN_PROGRESS - Satisfying the storage policy for path.<br/>SUCCESS - Storage policy satisfied for the path.<br/>NOT_AVAILABLE - Status not available. |
|
| `-w` | It requests that the command wait till all the files satisfy the policy in given path. This will print the current status of the path in each 10 sec and status are:<br/>PENDING - Path is in queue and not processed for satisfying the policy.<br/>IN_PROGRESS - Satisfying the storage policy for path.<br/>SUCCESS - Storage policy satisfied for the path.<br/>FAILURE : Few blocks failed to move.<br/>NOT_AVAILABLE - Status not available. |
|
||||||
|
|
||||||
### SPS Running Status
|
### SPS Running Status
|
||||||
|
|
||||||
|
@ -91,7 +91,7 @@ public void testAddReportedMoveAttemptFinishedBlocks() throws Exception {
|
|||||||
Long item = new Long(1234);
|
Long item = new Long(1234);
|
||||||
List<Block> blocks = new ArrayList<Block>();
|
List<Block> blocks = new ArrayList<Block>();
|
||||||
blocks.add(new Block(item));
|
blocks.add(new Block(item));
|
||||||
bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks));
|
bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
|
||||||
Block[] blockArray = new Block[blocks.size()];
|
Block[] blockArray = new Block[blocks.size()];
|
||||||
blocks.toArray(blockArray);
|
blocks.toArray(blockArray);
|
||||||
bsmAttemptedItems.addReportedMovedBlocks(blockArray);
|
bsmAttemptedItems.addReportedMovedBlocks(blockArray);
|
||||||
@ -108,7 +108,7 @@ public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception {
|
|||||||
Long item = new Long(1234);
|
Long item = new Long(1234);
|
||||||
List<Block> blocks = new ArrayList<>();
|
List<Block> blocks = new ArrayList<>();
|
||||||
blocks.add(new Block(item));
|
blocks.add(new Block(item));
|
||||||
bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks));
|
bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
|
||||||
assertEquals("Shouldn't receive result", 0,
|
assertEquals("Shouldn't receive result", 0,
|
||||||
bsmAttemptedItems.getMovementFinishedBlocksCount());
|
bsmAttemptedItems.getMovementFinishedBlocksCount());
|
||||||
assertEquals("Item doesn't exist in the attempted list", 1,
|
assertEquals("Item doesn't exist in the attempted list", 1,
|
||||||
@ -129,7 +129,7 @@ public void testPartialBlockMovementShouldBeRetried1() throws Exception {
|
|||||||
blocks.add(new Block(5678L));
|
blocks.add(new Block(5678L));
|
||||||
Long trackID = 0L;
|
Long trackID = 0L;
|
||||||
bsmAttemptedItems
|
bsmAttemptedItems
|
||||||
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
|
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
|
||||||
Block[] blksMovementReport = new Block[1];
|
Block[] blksMovementReport = new Block[1];
|
||||||
blksMovementReport[0] = new Block(item);
|
blksMovementReport[0] = new Block(item);
|
||||||
bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
|
bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
|
||||||
@ -154,7 +154,7 @@ public void testPartialBlockMovementShouldBeRetried2() throws Exception {
|
|||||||
List<Block> blocks = new ArrayList<>();
|
List<Block> blocks = new ArrayList<>();
|
||||||
blocks.add(new Block(item));
|
blocks.add(new Block(item));
|
||||||
bsmAttemptedItems
|
bsmAttemptedItems
|
||||||
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
|
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
|
||||||
Block[] blksMovementReport = new Block[1];
|
Block[] blksMovementReport = new Block[1];
|
||||||
blksMovementReport[0] = new Block(item);
|
blksMovementReport[0] = new Block(item);
|
||||||
bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
|
bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
|
||||||
@ -182,7 +182,7 @@ public void testPartialBlockMovementWithEmptyAttemptedQueue()
|
|||||||
List<Block> blocks = new ArrayList<>();
|
List<Block> blocks = new ArrayList<>();
|
||||||
blocks.add(new Block(item));
|
blocks.add(new Block(item));
|
||||||
bsmAttemptedItems
|
bsmAttemptedItems
|
||||||
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
|
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
|
||||||
Block[] blksMovementReport = new Block[1];
|
Block[] blksMovementReport = new Block[1];
|
||||||
blksMovementReport[0] = new Block(item);
|
blksMovementReport[0] = new Block(item);
|
||||||
bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
|
bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
|
||||||
|
@ -1412,8 +1412,8 @@ public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception {
|
|||||||
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
|
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
|
||||||
"3000");
|
"3000");
|
||||||
config.setBoolean(DFSConfigKeys
|
config.setBoolean(DFSConfigKeys
|
||||||
.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
|
.DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY,
|
||||||
true);
|
false);
|
||||||
|
|
||||||
StorageType[][] storagetypes = new StorageType[][] {
|
StorageType[][] storagetypes = new StorageType[][] {
|
||||||
{StorageType.ARCHIVE, StorageType.DISK},
|
{StorageType.ARCHIVE, StorageType.DISK},
|
||||||
@ -1474,8 +1474,8 @@ public void testStoragePolicySatisfyPathStatus() throws Exception {
|
|||||||
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
|
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
|
||||||
"3000");
|
"3000");
|
||||||
config.setBoolean(DFSConfigKeys
|
config.setBoolean(DFSConfigKeys
|
||||||
.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
|
.DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY,
|
||||||
true);
|
false);
|
||||||
|
|
||||||
StorageType[][] storagetypes = new StorageType[][] {
|
StorageType[][] storagetypes = new StorageType[][] {
|
||||||
{StorageType.ARCHIVE, StorageType.DISK},
|
{StorageType.ARCHIVE, StorageType.DISK},
|
||||||
@ -1531,6 +1531,55 @@ public Boolean get() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testMaxRetryForFailedBlock() throws Exception {
|
||||||
|
try {
|
||||||
|
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
|
||||||
|
true);
|
||||||
|
config.set(DFSConfigKeys
|
||||||
|
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
|
||||||
|
"1000");
|
||||||
|
config.set(DFSConfigKeys
|
||||||
|
.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
|
||||||
|
"1000");
|
||||||
|
StorageType[][] storagetypes = new StorageType[][] {
|
||||||
|
{StorageType.DISK, StorageType.DISK},
|
||||||
|
{StorageType.DISK, StorageType.DISK}};
|
||||||
|
hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2)
|
||||||
|
.storageTypes(storagetypes).build();
|
||||||
|
hdfsCluster.waitActive();
|
||||||
|
dfs = hdfsCluster.getFileSystem();
|
||||||
|
|
||||||
|
Path filePath = new Path("/retryFile");
|
||||||
|
DFSTestUtil.createFile(dfs, filePath, DEFAULT_BLOCK_SIZE, (short) 2,
|
||||||
|
0);
|
||||||
|
|
||||||
|
dfs.setStoragePolicy(filePath, "COLD");
|
||||||
|
dfs.satisfyStoragePolicy(filePath);
|
||||||
|
Thread.sleep(3000
|
||||||
|
* DFSConfigKeys
|
||||||
|
.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
|
||||||
|
DFSTestUtil.waitExpectedStorageType(filePath.toString(),
|
||||||
|
StorageType.DISK, 2, 60000, hdfsCluster.getFileSystem());
|
||||||
|
// Path status should be FAILURE
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
try {
|
||||||
|
StoragePolicySatisfyPathStatus status = dfs.getClient()
|
||||||
|
.checkStoragePolicySatisfyPathStatus(filePath.toString());
|
||||||
|
return StoragePolicySatisfyPathStatus.FAILURE.equals(status);
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail("Fail to get path status for sps");
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 100, 90000);
|
||||||
|
} finally {
|
||||||
|
shutdownCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void createDirectoryTree(DistributedFileSystem dfs)
|
private static void createDirectoryTree(DistributedFileSystem dfs)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
// tree structure
|
// tree structure
|
||||||
|
Loading…
Reference in New Issue
Block a user