HDFS-12310: [SPS]: Provide an option to track the status of in progress requests. Contributed by Surendra Singh Lilhore.

This commit is contained in:
Rakesh Radhakrishnan 2017-11-03 08:18:14 +05:30 committed by Uma Maheswara Rao Gangumalla
parent 5780f0624d
commit 68017e3349
16 changed files with 424 additions and 6 deletions

View File

@ -123,6 +123,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
@ -3169,4 +3170,25 @@ public RemoteIterator<OpenFileEntry> listOpenFiles(
checkOpen();
return new OpenFilesIterator(namenode, tracer, openFilesTypes, path);
}
/**
* Check the storage policy satisfy status of the path for which
* {@link DFSClient#satisfyStoragePolicy(String)} is called.
*
* @return Storage policy satisfy status.
* <ul>
* <li>PENDING if path is in queue and not processed for satisfying
* the policy.</li>
* <li>IN_PROGRESS if satisfying the storage policy for path.</li>
* <li>SUCCESS if storage policy satisfied for the path.</li>
* <li>NOT_AVAILABLE if
* {@link DFSClient#satisfyStoragePolicy(String)} not called for
* path or SPS work is already finished.</li>
* </ul>
* @throws IOException
*/
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
return namenode.checkStoragePolicySatisfyPathStatus(path);
}
}

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
@ -1764,4 +1765,24 @@ BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
*/
@Idempotent
boolean isStoragePolicySatisfierRunning() throws IOException;
/**
* Check the storage policy satisfy status of the path for which
* {@link ClientProtocol#satisfyStoragePolicy(String)} is called.
*
* @return Storage policy satisfy status.
* <ul>
* <li>PENDING if path is in queue and not processed for satisfying
* the policy.</li>
* <li>IN_PROGRESS if satisfying the storage policy for path.</li>
* <li>SUCCESS if storage policy satisfied for the path.</li>
* <li>NOT_AVAILABLE if
* {@link ClientProtocol#satisfyStoragePolicy(String)} not called for
* path or SPS work is already finished.</li>
* </ul>
* @throws IOException
*/
@Idempotent
StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException;
}

View File

@ -128,6 +128,33 @@ public enum SafeModeAction {
SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET, SAFEMODE_FORCE_EXIT
}
/**
* Storage policy satisfy path status.
*/
public enum StoragePolicySatisfyPathStatus {
/**
* Scheduled but not yet processed. This will come only in case of
* directory. Directory will be added first in "pendingWorkForDirectory"
* queue and then later it is processed recursively.
*/
PENDING,
/**
* Satisfying the storage policy for path.
*/
IN_PROGRESS,
/**
* Storage policy satisfied for the path.
*/
SUCCESS,
/**
* Status not available.
*/
NOT_AVAILABLE
}
public enum RollingUpgradeAction {
QUERY, PREPARE, FINALIZE;

View File

@ -70,6 +70,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@ -100,6 +101,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto;
@ -241,6 +244,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.util.concurrent.AsyncGet;
/**
@ -1973,4 +1977,20 @@ public void satisfyStoragePolicy(String src) throws IOException {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
try {
CheckStoragePolicySatisfyPathStatusRequestProto request =
CheckStoragePolicySatisfyPathStatusRequestProto.newBuilder()
.setSrc(path)
.build();
CheckStoragePolicySatisfyPathStatusResponseProto response = rpcProxy
.checkStoragePolicySatisfyPathStatus(null, request);
return PBHelperClient.convert(response.getStatus());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -130,6 +130,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolStatsProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusResponseProto.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto;
@ -3398,4 +3399,36 @@ public static List<OpenFilesTypeProto> convertOpenFileTypes(
}
return typeProtos;
}
public static StoragePolicySatisfyPathStatus convert(
HdfsConstants.StoragePolicySatisfyPathStatus status) {
switch (status) {
case PENDING:
return StoragePolicySatisfyPathStatus.PENDING;
case IN_PROGRESS:
return StoragePolicySatisfyPathStatus.IN_PROGRESS;
case SUCCESS:
return StoragePolicySatisfyPathStatus.SUCCESS;
case NOT_AVAILABLE:
return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
default:
throw new IllegalArgumentException("Unexpected SPSStatus :" + status);
}
}
public static HdfsConstants.StoragePolicySatisfyPathStatus convert(
StoragePolicySatisfyPathStatus status) {
switch (status) {
case PENDING:
return HdfsConstants.StoragePolicySatisfyPathStatus.PENDING;
case IN_PROGRESS:
return HdfsConstants.StoragePolicySatisfyPathStatus.IN_PROGRESS;
case SUCCESS:
return HdfsConstants.StoragePolicySatisfyPathStatus.SUCCESS;
case NOT_AVAILABLE:
return HdfsConstants.StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
default:
throw new IllegalArgumentException("Unexpected SPSStatus :" + status);
}
}
}

View File

@ -481,7 +481,6 @@ message RollingUpgradeInfoProto {
message RollingUpgradeResponseProto {
optional RollingUpgradeInfoProto rollingUpgradeInfo= 1;
}
message ListCorruptFileBlocksRequestProto {
required string path = 1;
optional string cookie = 2;
@ -846,6 +845,20 @@ message IsStoragePolicySatisfierRunningResponseProto {
required bool running = 1;
}
message CheckStoragePolicySatisfyPathStatusRequestProto { // no parameters
required string src = 1;
}
message CheckStoragePolicySatisfyPathStatusResponseProto {
enum StoragePolicySatisfyPathStatus {
PENDING = 0;
IN_PROGRESS = 1;
SUCCESS = 2;
NOT_AVAILABLE = 3;
}
required StoragePolicySatisfyPathStatus status = 1;
}
service ClientNamenodeProtocol {
rpc getBlockLocations(GetBlockLocationsRequestProto)
returns(GetBlockLocationsResponseProto);
@ -1036,4 +1049,6 @@ service ClientNamenodeProtocol {
returns(SatisfyStoragePolicyResponseProto);
rpc isStoragePolicySatisfierRunning(IsStoragePolicySatisfierRunningRequestProto)
returns(IsStoragePolicySatisfierRunningResponseProto);
rpc checkStoragePolicySatisfyPathStatus(CheckStoragePolicySatisfyPathStatusRequestProto)
returns(CheckStoragePolicySatisfyPathStatusResponseProto);
}

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -85,6 +86,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
@ -257,7 +260,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingCodecsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingCodecsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.*;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
@ -1922,4 +1925,22 @@ public SatisfyStoragePolicyResponseProto satisfyStoragePolicy(
}
return VOID_SATISFYSTORAGEPOLICY_RESPONSE;
}
@Override
public CheckStoragePolicySatisfyPathStatusResponseProto
checkStoragePolicySatisfyPathStatus(RpcController controller,
CheckStoragePolicySatisfyPathStatusRequestProto request)
throws ServiceException {
try {
StoragePolicySatisfyPathStatus status = server
.checkStoragePolicySatisfyPathStatus(request.getSrc());
CheckStoragePolicySatisfyPathStatusResponseProto.Builder builder =
CheckStoragePolicySatisfyPathStatusResponseProto
.newBuilder();
builder.setStatus(PBHelperClient.convert(status));
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -47,6 +47,7 @@
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
import org.apache.hadoop.HadoopIllegalArgumentException;
@ -68,6 +69,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@ -5103,4 +5105,14 @@ public void stopSPSGracefully() {
public boolean isStoragePolicySatisfierRunning() {
return sps.isRunning();
}
/**
* @return status
* Storage policy satisfy status of the path.
* @throws IOException
*/
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
return sps.checkStoragePolicySatisfyPathStatus(path);
}
}

View File

@ -26,13 +26,17 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -62,6 +66,9 @@ public class BlockStorageMovementNeeded {
private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
new HashMap<Long, DirPendingWorkInfo>();
private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
new ConcurrentHashMap<>();
private final Namesystem namesystem;
// List of pending dir to satisfy the policy
@ -73,6 +80,10 @@ public class BlockStorageMovementNeeded {
private final int maxQueuedItem;
// Amount of time to cache the SUCCESS status of path before turning it to
// NOT_AVAILABLE.
private static long statusClearanceElapsedTimeMs = 300000;
public BlockStorageMovementNeeded(Namesystem namesystem,
StoragePolicySatisfier sps, int queueLimit) {
this.namesystem = namesystem;
@ -88,6 +99,9 @@ public BlockStorageMovementNeeded(Namesystem namesystem,
* - track info for satisfy the policy
*/
public synchronized void add(ItemInfo trackInfo) {
spsStatus.put(trackInfo.getStartId(),
new StoragePolicySatisfyPathStatusInfo(
StoragePolicySatisfyPathStatus.IN_PROGRESS));
storageMovementNeeded.add(trackInfo);
}
@ -125,6 +139,8 @@ public synchronized ItemInfo get() {
}
public synchronized void addToPendingDirQueue(long id) {
spsStatus.put(id, new StoragePolicySatisfyPathStatusInfo(
StoragePolicySatisfyPathStatus.PENDING));
spsDirsToBeTraveresed.add(id);
// Notify waiting FileInodeIdCollector thread about the newly
// added SPS path.
@ -172,6 +188,7 @@ public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
if (inode == null) {
// directory deleted just remove it.
this.pendingWorkForDirectory.remove(startId);
markSuccess(startId);
} else {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
if (pendingWork != null) {
@ -179,6 +196,7 @@ public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
if (pendingWork.isDirWorkDone()) {
namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
pendingWorkForDirectory.remove(startId);
markSuccess(startId);
}
}
}
@ -187,6 +205,7 @@ public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
// storageMovementAttemptedItems or file policy satisfied.
namesystem.removeXattr(trackInfo.getTrackId(),
XATTR_SATISFY_STORAGE_POLICY);
markSuccess(trackInfo.getStartId());
}
}
@ -202,6 +221,19 @@ public synchronized void clearQueue(long trackId) {
pendingWorkForDirectory.remove(trackId);
}
/**
* Mark inode status as SUCCESS in map.
*/
private void markSuccess(long startId){
StoragePolicySatisfyPathStatusInfo spsStatusInfo =
spsStatus.get(startId);
if (spsStatusInfo == null) {
spsStatusInfo = new StoragePolicySatisfyPathStatusInfo();
spsStatus.put(startId, spsStatusInfo);
}
spsStatusInfo.setSuccess();
}
/**
* Clean all the movements in spsDirsToBeTraveresed/storageMovementNeeded
* and notify to clean up required resources.
@ -256,6 +288,7 @@ private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser
@Override
public void run() {
LOG.info("Starting FileInodeIdCollector!.");
long lastStatusCleanTime = 0;
while (namesystem.isRunning() && sps.isRunning()) {
try {
if (!namesystem.isInSafeMode()) {
@ -271,6 +304,9 @@ public void run() {
if (startInode != null) {
try {
remainingCapacity = remainingCapacity();
spsStatus.put(startINodeId,
new StoragePolicySatisfyPathStatusInfo(
StoragePolicySatisfyPathStatus.IN_PROGRESS));
readLock();
traverseDir(startInode.asDirectory(), startINodeId,
HdfsFileStatus.EMPTY_NAME,
@ -289,9 +325,16 @@ public void run() {
namesystem.removeXattr(startInode.getId(),
XATTR_SATISFY_STORAGE_POLICY);
pendingWorkForDirectory.remove(startInode.getId());
markSuccess(startInode.getId());
}
}
}
//Clear the SPS status if status is in SUCCESS more than 5 min.
if (Time.monotonicNow()
- lastStatusCleanTime > statusClearanceElapsedTimeMs) {
lastStatusCleanTime = Time.monotonicNow();
cleanSpsStatus();
}
}
} catch (Throwable t) {
LOG.warn("Exception while loading inodes to satisfy the policy", t);
@ -299,6 +342,16 @@ public void run() {
}
}
private synchronized void cleanSpsStatus() {
for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it =
spsStatus.entrySet().iterator(); it.hasNext();) {
Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next();
if (entry.getValue().canRemove()) {
it.remove();
}
}
}
@Override
protected void checkPauseForTesting() throws InterruptedException {
// TODO implement if needed
@ -434,4 +487,60 @@ public long getStartId() {
return startId;
}
}
/**
* Represent the file/directory block movement status.
*/
static class StoragePolicySatisfyPathStatusInfo {
private StoragePolicySatisfyPathStatus status =
StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
private long lastStatusUpdateTime;
StoragePolicySatisfyPathStatusInfo() {
this.lastStatusUpdateTime = 0;
}
StoragePolicySatisfyPathStatusInfo(StoragePolicySatisfyPathStatus status) {
this.status = status;
this.lastStatusUpdateTime = 0;
}
private void setSuccess() {
this.status = StoragePolicySatisfyPathStatus.SUCCESS;
this.lastStatusUpdateTime = Time.monotonicNow();
}
private StoragePolicySatisfyPathStatus getStatus() {
return status;
}
/**
* Return true if SUCCESS status cached more then 5 min.
*/
private boolean canRemove() {
return StoragePolicySatisfyPathStatus.SUCCESS == status
&& (Time.monotonicNow()
- lastStatusUpdateTime) > statusClearanceElapsedTimeMs;
}
}
public StoragePolicySatisfyPathStatus getStatus(long id) {
StoragePolicySatisfyPathStatusInfo spsStatusInfo = spsStatus.get(id);
if(spsStatusInfo == null){
return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
}
return spsStatusInfo.getStatus();
}
@VisibleForTesting
public static void setStatusClearanceElapsedTimeMs(
long statusClearanceElapsedTimeMs) {
BlockStorageMovementNeeded.statusClearanceElapsedTimeMs =
statusClearanceElapsedTimeMs;
}
@VisibleForTesting
public static long getStatusClearanceElapsedTimeMs() {
return statusClearanceElapsedTimeMs;
}
}

View File

@ -28,7 +28,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
import static org.apache.hadoop.util.Time.now;
import java.io.FileNotFoundException;
@ -111,6 +110,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -2542,4 +2542,15 @@ public boolean isStoragePolicySatisfierRunning() throws IOException {
}
return namesystem.getBlockManager().isStoragePolicySatisfierRunning();
}
@Override
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
checkNNStartup();
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
return namesystem.getBlockManager().checkStoragePolicySatisfyPathStatus(
path);
}
}

View File

@ -19,6 +19,7 @@
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -36,6 +37,7 @@
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.server.balancer.Matcher;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
@ -934,4 +936,10 @@ List<Block> getBlocks() {
}
}
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
INode inode = namesystem.getFSDirectory().getINode(path);
return storageMovementNeeded.getStatus(inode.getId());
}
}

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.tools.TableListing;
import org.apache.hadoop.util.StringUtils;
@ -258,7 +259,7 @@ public String getName() {
@Override
public String getShortUsage() {
return "[" + getName() + " -path <path>]\n";
return "[" + getName() + " [-w] -path <path>]\n";
}
@Override
@ -266,6 +267,14 @@ public String getLongUsage() {
TableListing listing = AdminHelper.getOptionDescriptionListing();
listing.addRow("<path>", "The path of the file/directory to satisfy"
+ " storage policy");
listing.addRow("-w",
"It requests that the command wait till all the files satisfy"
+ " the policy in given path. This will print the current"
+ "status of the path in each 10 sec and status are:\n"
+ "PENDING : Path is in queue and not processed for satisfying"
+ " the policy.\nIN_PROGRESS : Satisfying the storage policy for"
+ " path.\nSUCCESS : Storage policy satisfied for the path.\n"
+ "NOT_AVAILABLE : Status not available.");
return getShortUsage() + "\n" +
"Schedule blocks to move based on file/directory policy.\n\n" +
listing.toString();
@ -285,12 +294,36 @@ public int run(Configuration conf, List<String> args) throws IOException {
dfs.satisfyStoragePolicy(new Path(path));
System.out.println("Scheduled blocks to move based on the current"
+ " storage policy on " + path);
boolean waitOpt = StringUtils.popOption("-w", args);
if (waitOpt) {
waitForSatisfyPolicy(dfs, path);
}
} catch (Exception e) {
System.err.println(AdminHelper.prettifyException(e));
return 2;
}
return 0;
}
private void waitForSatisfyPolicy(DistributedFileSystem dfs, String path)
throws IOException {
System.out.println("Waiting for satisfy the policy ...");
while (true) {
StoragePolicySatisfyPathStatus status = dfs.getClient()
.checkStoragePolicySatisfyPathStatus(path);
if (StoragePolicySatisfyPathStatus.SUCCESS.equals(status)) {
System.out.println(status);
break;
}
System.out.println(status);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
}
System.out.println(" done");
}
}
/** Command to check storage policy satisfier status. */

View File

@ -217,13 +217,14 @@ Schedule blocks to move based on file's/directory's current storage policy.
* Command:
hdfs storagepolicies -satisfyStoragePolicy -path <path>
hdfs storagepolicies -satisfyStoragePolicy [-w] -path <path>
* Arguments:
| | |
|:---- |:---- |
| `-path <path>` | The path referring to either a directory or a file. |
| `-w` | It requests that the command wait till all the files satisfy the policy in given path. This will print the current status of the path in each 10 sec and status are:<br/>PENDING - Path is in queue and not processed for satisfying the policy.<br/>IN_PROGRESS - Satisfying the storage policy for path.<br/>SUCCESS - Storage policy satisfied for the path.<br/>NOT_AVAILABLE - Status not available. |
### SPS Running Status

View File

@ -479,9 +479,9 @@ public void testSPSOnChildAndParentDirectory() throws Exception {
clusterSetUp();
fs.setStoragePolicy(parentDir, "COLD");
fs.satisfyStoragePolicy(childDir);
fs.satisfyStoragePolicy(parentDir);
DFSTestUtil.waitExpectedStorageType(childFileName, StorageType.ARCHIVE,
3, 30000, cluster.getFileSystem());
fs.satisfyStoragePolicy(parentDir);
DFSTestUtil.waitExpectedStorageType(parentFileName, StorageType.ARCHIVE,
3, 30000, cluster.getFileSystem());
} finally {

View File

@ -53,6 +53,7 @@
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@ -1463,6 +1464,72 @@ public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception {
}
}
@Test(timeout = 300000)
public void testStoragePolicySatisfyPathStatus() throws Exception {
try {
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
config.set(DFSConfigKeys
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
"3000");
config.setBoolean(DFSConfigKeys
.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
true);
StorageType[][] storagetypes = new StorageType[][] {
{StorageType.ARCHIVE, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.DISK}};
hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2)
.storageTypes(storagetypes).build();
hdfsCluster.waitActive();
BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(20000);
dfs = hdfsCluster.getFileSystem();
Path filePath = new Path("/file");
DFSTestUtil.createFile(dfs, filePath, 1024, (short) 2,
0);
dfs.setStoragePolicy(filePath, "COLD");
dfs.satisfyStoragePolicy(filePath);
StoragePolicySatisfyPathStatus status = dfs.getClient()
.checkStoragePolicySatisfyPathStatus(filePath.toString());
Assert.assertTrue("Status should be IN_PROGRESS",
StoragePolicySatisfyPathStatus.IN_PROGRESS.equals(status));
DFSTestUtil.waitExpectedStorageType(filePath.toString(),
StorageType.ARCHIVE, 2, 30000, dfs);
// wait till status is SUCCESS
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
StoragePolicySatisfyPathStatus status = dfs.getClient()
.checkStoragePolicySatisfyPathStatus(filePath.toString());
return StoragePolicySatisfyPathStatus.SUCCESS.equals(status);
} catch (IOException e) {
Assert.fail("Fail to get path status for sps");
}
return false;
}
}, 100, 60000);
// wait till status is NOT_AVAILABLE
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
StoragePolicySatisfyPathStatus status = dfs.getClient()
.checkStoragePolicySatisfyPathStatus(filePath.toString());
return StoragePolicySatisfyPathStatus.NOT_AVAILABLE.equals(status);
} catch (IOException e) {
Assert.fail("Fail to get path status for sps");
}
return false;
}
}, 100, 60000);
} finally {
shutdownCluster();
}
}
private static void createDirectoryTree(DistributedFileSystem dfs)
throws Exception {
// tree structure

View File

@ -204,4 +204,22 @@ public void testIsSatisfierRunningCommand() throws Exception {
DFSTestUtil.toolRun(admin, "-isSatisfierRunning status", 1,
"Can't understand arguments: ");
}
@Test(timeout = 90000)
public void testSatisfyStoragePolicyCommandWithWaitOption()
throws Exception {
final String file = "/testSatisfyStoragePolicyCommandWithWaitOption";
DFSTestUtil.createFile(fs, new Path(file), SIZE, REPL, 0);
final StoragePolicyAdmin admin = new StoragePolicyAdmin(conf);
DFSTestUtil.toolRun(admin, "-setStoragePolicy -path " + file
+ " -policy COLD", 0, "Set storage policy COLD on " + file.toString());
DFSTestUtil.toolRun(admin, "-satisfyStoragePolicy -w -path " + file, 0,
"Waiting for satisfy the policy");
DFSTestUtil
.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000, fs);
}
}