HDFS-17014. HttpFS Add Support getStatus API (#5660). Contributed by Hualong Zhang.
Reviewed-by: Shilun Fan <slfan1989@apache.org> Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
parent
5272ed8670
commit
408dbf318e
@ -35,6 +35,7 @@
|
||||
import org.apache.hadoop.fs.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FsStatus;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
@ -188,6 +189,7 @@ public static FILE_TYPE getType(FileStatus fileStatus) {
|
||||
|
||||
public static final String FILE_STATUSES_JSON = "FileStatuses";
|
||||
public static final String FILE_STATUS_JSON = "FileStatus";
|
||||
public static final String FS_STATUS_JSON = "FsStatus";
|
||||
public static final String PATH_SUFFIX_JSON = "pathSuffix";
|
||||
public static final String TYPE_JSON = "type";
|
||||
public static final String LENGTH_JSON = "length";
|
||||
@ -208,6 +210,9 @@ public static FILE_TYPE getType(FileStatus fileStatus) {
|
||||
public static final String XATTRNAMES_JSON = "XAttrNames";
|
||||
public static final String ECPOLICY_JSON = "ecPolicyObj";
|
||||
public static final String SYMLINK_JSON = "symlink";
|
||||
public static final String CAPACITY_JSON = "capacity";
|
||||
public static final String USED_JSON = "used";
|
||||
public static final String REMAINING_JSON = "remaining";
|
||||
|
||||
public static final String FILE_CHECKSUM_JSON = "FileChecksum";
|
||||
public static final String CHECKSUM_ALGORITHM_JSON = "algorithm";
|
||||
@ -278,6 +283,7 @@ public enum Operation {
|
||||
CHECKACCESS(HTTP_GET), SETECPOLICY(HTTP_PUT), GETECPOLICY(HTTP_GET), UNSETECPOLICY(
|
||||
HTTP_POST), SATISFYSTORAGEPOLICY(HTTP_PUT), GETSNAPSHOTDIFFLISTING(HTTP_GET),
|
||||
GETFILELINKSTATUS(HTTP_GET),
|
||||
GETSTATUS(HTTP_GET),
|
||||
GET_BLOCK_LOCATIONS(HTTP_GET);
|
||||
|
||||
private String httpMethod;
|
||||
@ -1756,6 +1762,17 @@ public FileStatus getFileLinkStatus(final Path path) throws IOException {
|
||||
return status.makeQualified(getUri(), path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FsStatus getStatus(final Path path) throws IOException {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put(OP_PARAM, Operation.GETSTATUS.toString());
|
||||
HttpURLConnection conn =
|
||||
getConnection(Operation.GETSTATUS.getMethod(), params, path, true);
|
||||
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
||||
return JsonUtilClient.toFsStatus(json);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static BlockLocation[] toBlockLocations(JSONObject json) throws IOException {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FsStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FilterFileSystem;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
@ -424,6 +425,23 @@ private static JSONObject storagePoliciesToJSON(
|
||||
return json;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the fsStatus operation.
|
||||
*
|
||||
* @param fsStatus a FsStatus object
|
||||
* @return JSON map suitable for wire transport
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Map<String, Object> toJson(FsStatus fsStatus) {
|
||||
Map<String, Object> json = new LinkedHashMap<>();
|
||||
JSONObject statusJson = new JSONObject();
|
||||
statusJson.put(HttpFSFileSystem.USED_JSON, fsStatus.getUsed());
|
||||
statusJson.put(HttpFSFileSystem.REMAINING_JSON, fsStatus.getRemaining());
|
||||
statusJson.put(HttpFSFileSystem.CAPACITY_JSON, fsStatus.getCapacity());
|
||||
json.put(HttpFSFileSystem.FS_STATUS_JSON, statusJson);
|
||||
return json;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executor that performs an append FileSystemAccess files system operation.
|
||||
*/
|
||||
@ -2300,4 +2318,28 @@ public Map execute(FileSystem fs) throws IOException {
|
||||
return toJson(status);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executor that performs a getFsStatus operation.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static class FSStatus implements FileSystemAccess.FileSystemExecutor<Map> {
|
||||
final private Path path;
|
||||
|
||||
/**
|
||||
* Creates a fsStatus executor.
|
||||
*
|
||||
* @param path the path to retrieve the status.
|
||||
*/
|
||||
public FSStatus(String path) {
|
||||
this.path = new Path(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map execute(FileSystem fs) throws IOException {
|
||||
FsStatus fsStatus = fs.getStatus(path);
|
||||
HttpFSServerWebApp.get().getMetrics().incrOpsStatus();
|
||||
return toJson(fsStatus);
|
||||
}
|
||||
}
|
||||
}
|
@ -129,6 +129,7 @@ public class HttpFSParametersProvider extends ParametersProvider {
|
||||
PARAMS_DEF.put(Operation.UNSETECPOLICY, new Class[] {});
|
||||
PARAMS_DEF.put(Operation.SATISFYSTORAGEPOLICY, new Class[] {});
|
||||
PARAMS_DEF.put(Operation.GETFILELINKSTATUS, new Class[]{});
|
||||
PARAMS_DEF.put(Operation.GETSTATUS, new Class[]{});
|
||||
PARAMS_DEF.put(Operation.GET_BLOCK_LOCATIONS, new Class[] {OffsetParam.class, LenParam.class});
|
||||
}
|
||||
|
||||
|
@ -554,6 +554,12 @@ public InputStream run() throws Exception {
|
||||
response = Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
break;
|
||||
}
|
||||
case GETSTATUS: {
|
||||
FSOperations.FSStatus command = new FSOperations.FSStatus(path);
|
||||
@SuppressWarnings("rawtypes") Map js = fsExecute(user, command);
|
||||
response = Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
throw new IOException(
|
||||
MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value()));
|
||||
|
@ -64,6 +64,7 @@ public class HttpFSServerMetrics {
|
||||
private @Metric MutableCounterLong opsListing;
|
||||
private @Metric MutableCounterLong opsStat;
|
||||
private @Metric MutableCounterLong opsCheckAccess;
|
||||
private @Metric MutableCounterLong opsStatus;
|
||||
|
||||
private final MetricsRegistry registry = new MetricsRegistry("httpfsserver");
|
||||
private final String name;
|
||||
@ -160,4 +161,8 @@ public long getOpsListing() {
|
||||
public long getOpsStat() {
|
||||
return opsStat.value();
|
||||
}
|
||||
|
||||
public void incrOpsStatus() {
|
||||
opsStatus.incr();
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,7 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.FsStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.QuotaUsage;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
@ -1216,7 +1217,7 @@ protected enum Operation {
|
||||
FILE_STATUS_ATTR, GET_SNAPSHOT_DIFF, GET_SNAPSHOTTABLE_DIRECTORY_LIST,
|
||||
GET_SNAPSHOT_LIST, GET_SERVERDEFAULTS, CHECKACCESS, SETECPOLICY,
|
||||
SATISFYSTORAGEPOLICY, GET_SNAPSHOT_DIFF_LISTING, GETFILEBLOCKLOCATIONS,
|
||||
GETFILELINKSTATUS
|
||||
GETFILELINKSTATUS, GETSTATUS
|
||||
}
|
||||
|
||||
private void operation(Operation op) throws Exception {
|
||||
@ -1362,6 +1363,9 @@ private void operation(Operation op) throws Exception {
|
||||
case GETFILELINKSTATUS:
|
||||
testGetFileLinkStatus();
|
||||
break;
|
||||
case GETSTATUS:
|
||||
testGetStatus();
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
@ -2081,6 +2085,32 @@ private void testGetFileLinkStatus() throws Exception {
|
||||
assertTrue(fs.getFileLinkStatus(linkToFile).isSymlink());
|
||||
}
|
||||
|
||||
private void testGetStatus() throws Exception {
|
||||
if (isLocalFS()) {
|
||||
// do not test the getStatus for local FS.
|
||||
return;
|
||||
}
|
||||
final Path path = new Path("/foo");
|
||||
FileSystem fs = FileSystem.get(path.toUri(), this.getProxiedFSConf());
|
||||
if (fs instanceof DistributedFileSystem) {
|
||||
DistributedFileSystem dfs =
|
||||
(DistributedFileSystem) FileSystem.get(path.toUri(), this.getProxiedFSConf());
|
||||
FileSystem httpFs = this.getHttpFSFileSystem();
|
||||
|
||||
FsStatus dfsFsStatus = dfs.getStatus(path);
|
||||
FsStatus httpFsStatus = httpFs.getStatus(path);
|
||||
|
||||
//Validate used free and capacity are the same as DistributedFileSystem
|
||||
assertEquals(dfsFsStatus.getUsed(), httpFsStatus.getUsed());
|
||||
assertEquals(dfsFsStatus.getRemaining(), httpFsStatus.getRemaining());
|
||||
assertEquals(dfsFsStatus.getCapacity(), httpFsStatus.getCapacity());
|
||||
httpFs.close();
|
||||
dfs.close();
|
||||
} else {
|
||||
Assert.fail(fs.getClass().getSimpleName() + " is not of type DistributedFileSystem.");
|
||||
}
|
||||
}
|
||||
|
||||
private void assertHttpFsReportListingWithDfsClient(SnapshotDiffReportListing diffReportListing,
|
||||
SnapshotDiffReportListing dfsDiffReportListing) {
|
||||
Assert.assertEquals(diffReportListing.getCreateList().size(),
|
||||
|
Loading…
Reference in New Issue
Block a user