HDFS-17115. HttpFS Add Support getErasureCodeCodecs API (#5875). Contributed by Hualong Zhang.
Reviewed-by: Shilun Fan <slfan1989@apache.org> Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
parent
ad001c93cf
commit
068d8c7e4d
@ -286,6 +286,7 @@ public enum Operation {
|
|||||||
GETFILELINKSTATUS(HTTP_GET),
|
GETFILELINKSTATUS(HTTP_GET),
|
||||||
GETSTATUS(HTTP_GET),
|
GETSTATUS(HTTP_GET),
|
||||||
GETECPOLICIES(HTTP_GET),
|
GETECPOLICIES(HTTP_GET),
|
||||||
|
GETECCODECS(HTTP_GET),
|
||||||
GET_BLOCK_LOCATIONS(HTTP_GET);
|
GET_BLOCK_LOCATIONS(HTTP_GET);
|
||||||
|
|
||||||
private String httpMethod;
|
private String httpMethod;
|
||||||
@ -1786,6 +1787,17 @@ public Collection<ErasureCodingPolicyInfo> getAllErasureCodingPolicies() throws
|
|||||||
return JsonUtilClient.getAllErasureCodingPolicies(json);
|
return JsonUtilClient.getAllErasureCodingPolicies(json);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map<String, String> getAllErasureCodingCodecs() throws IOException {
|
||||||
|
Map<String, String> params = new HashMap<>();
|
||||||
|
params.put(OP_PARAM, Operation.GETECCODECS.toString());
|
||||||
|
Path path = new Path(getUri().toString(), "/");
|
||||||
|
HttpURLConnection conn =
|
||||||
|
getConnection(Operation.GETECCODECS.getMethod(), params, path, false);
|
||||||
|
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||||
|
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
||||||
|
return JsonUtilClient.getErasureCodeCodecs(json);
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static BlockLocation[] toBlockLocations(JSONObject json) throws IOException {
|
static BlockLocation[] toBlockLocations(JSONObject json) throws IOException {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
@ -65,6 +65,7 @@
|
|||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -2369,4 +2370,30 @@ public String execute(FileSystem fs) throws IOException {
|
|||||||
return JsonUtil.toJsonString(ecPolicyInfos.stream().toArray(ErasureCodingPolicyInfo[]::new));
|
return JsonUtil.toJsonString(ecPolicyInfos.stream().toArray(ErasureCodingPolicyInfo[]::new));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executor that performs a FSGetErasureCodingCodecs operation.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public static class FSGetErasureCodingCodecs
|
||||||
|
implements FileSystemAccess.FileSystemExecutor<Map> {
|
||||||
|
|
||||||
|
public FSGetErasureCodingCodecs() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map execute(FileSystem fs) throws IOException {
|
||||||
|
Map<String, Map<String, String>> ecCodecs = new HashMap<>();
|
||||||
|
if (fs instanceof DistributedFileSystem) {
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||||
|
ecCodecs.put("ErasureCodingCodecs", dfs.getAllErasureCodingCodecs());
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedOperationException("getErasureCodeCodecs is " +
|
||||||
|
"not supported for HttpFs on " + fs.getClass() +
|
||||||
|
". Please check your fs.defaultFS configuration");
|
||||||
|
}
|
||||||
|
HttpFSServerWebApp.get().getMetrics().incrOpsECCodecs();
|
||||||
|
return ecCodecs;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
@ -131,6 +131,7 @@ public class HttpFSParametersProvider extends ParametersProvider {
|
|||||||
PARAMS_DEF.put(Operation.GETFILELINKSTATUS, new Class[]{});
|
PARAMS_DEF.put(Operation.GETFILELINKSTATUS, new Class[]{});
|
||||||
PARAMS_DEF.put(Operation.GETSTATUS, new Class[]{});
|
PARAMS_DEF.put(Operation.GETSTATUS, new Class[]{});
|
||||||
PARAMS_DEF.put(Operation.GETECPOLICIES, new Class[]{});
|
PARAMS_DEF.put(Operation.GETECPOLICIES, new Class[]{});
|
||||||
|
PARAMS_DEF.put(Operation.GETECCODECS, new Class[]{});
|
||||||
PARAMS_DEF.put(Operation.GET_BLOCK_LOCATIONS, new Class[] {OffsetParam.class, LenParam.class});
|
PARAMS_DEF.put(Operation.GET_BLOCK_LOCATIONS, new Class[] {OffsetParam.class, LenParam.class});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -534,6 +534,14 @@ public InputStream run() throws Exception {
|
|||||||
response = Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
response = Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case GETECCODECS: {
|
||||||
|
FSOperations.FSGetErasureCodingCodecs command =
|
||||||
|
new FSOperations.FSGetErasureCodingCodecs();
|
||||||
|
Map json = fsExecute(user, command);
|
||||||
|
AUDIT_LOG.info("[{}]", path);
|
||||||
|
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
|
||||||
|
break;
|
||||||
|
}
|
||||||
case GET_BLOCK_LOCATIONS: {
|
case GET_BLOCK_LOCATIONS: {
|
||||||
long offset = 0;
|
long offset = 0;
|
||||||
long len = Long.MAX_VALUE;
|
long len = Long.MAX_VALUE;
|
||||||
|
@ -66,6 +66,7 @@ public class HttpFSServerMetrics {
|
|||||||
private @Metric MutableCounterLong opsCheckAccess;
|
private @Metric MutableCounterLong opsCheckAccess;
|
||||||
private @Metric MutableCounterLong opsStatus;
|
private @Metric MutableCounterLong opsStatus;
|
||||||
private @Metric MutableCounterLong opsAllECPolicies;
|
private @Metric MutableCounterLong opsAllECPolicies;
|
||||||
|
private @Metric MutableCounterLong opsECCodecs;
|
||||||
|
|
||||||
private final MetricsRegistry registry = new MetricsRegistry("httpfsserver");
|
private final MetricsRegistry registry = new MetricsRegistry("httpfsserver");
|
||||||
private final String name;
|
private final String name;
|
||||||
@ -170,4 +171,8 @@ public void incrOpsStatus() {
|
|||||||
public void incrOpsAllECPolicies() {
|
public void incrOpsAllECPolicies() {
|
||||||
opsAllECPolicies.incr();
|
opsAllECPolicies.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incrOpsECCodecs() {
|
||||||
|
opsECCodecs.incr();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -105,6 +105,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
@ -1218,9 +1219,9 @@ protected enum Operation {
|
|||||||
FILE_STATUS_ATTR, GET_SNAPSHOT_DIFF, GET_SNAPSHOTTABLE_DIRECTORY_LIST,
|
FILE_STATUS_ATTR, GET_SNAPSHOT_DIFF, GET_SNAPSHOTTABLE_DIRECTORY_LIST,
|
||||||
GET_SNAPSHOT_LIST, GET_SERVERDEFAULTS, CHECKACCESS, SETECPOLICY,
|
GET_SNAPSHOT_LIST, GET_SERVERDEFAULTS, CHECKACCESS, SETECPOLICY,
|
||||||
SATISFYSTORAGEPOLICY, GET_SNAPSHOT_DIFF_LISTING, GETFILEBLOCKLOCATIONS,
|
SATISFYSTORAGEPOLICY, GET_SNAPSHOT_DIFF_LISTING, GETFILEBLOCKLOCATIONS,
|
||||||
GETFILELINKSTATUS, GETSTATUS, GETECPOLICIES
|
GETFILELINKSTATUS, GETSTATUS, GETECPOLICIES, GETECCODECS
|
||||||
}
|
}
|
||||||
|
@SuppressWarnings("methodlength")
|
||||||
private void operation(Operation op) throws Exception {
|
private void operation(Operation op) throws Exception {
|
||||||
switch (op) {
|
switch (op) {
|
||||||
case GET:
|
case GET:
|
||||||
@ -1370,6 +1371,9 @@ private void operation(Operation op) throws Exception {
|
|||||||
case GETECPOLICIES:
|
case GETECPOLICIES:
|
||||||
testGetAllEEPolicies();
|
testGetAllEEPolicies();
|
||||||
break;
|
break;
|
||||||
|
case GETECCODECS:
|
||||||
|
testGetECCodecs();
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2149,6 +2153,54 @@ private void testGetAllEEPolicies() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testGetECCodecs() throws Exception {
|
||||||
|
if (isLocalFS()) {
|
||||||
|
// do not test the testGetECCodecs for local FS.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final Path path = new Path("/foo");
|
||||||
|
|
||||||
|
FileSystem fs = FileSystem.get(path.toUri(), this.getProxiedFSConf());
|
||||||
|
LambdaTestUtils.intercept(AssertionError.class, () -> {
|
||||||
|
if (!(fs instanceof DistributedFileSystem)) {
|
||||||
|
throw new AssertionError(fs.getClass().getSimpleName() +
|
||||||
|
" is not of type DistributedFileSystem.");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
DistributedFileSystem dfs =
|
||||||
|
(DistributedFileSystem) FileSystem.get(path.toUri(), this.getProxiedFSConf());
|
||||||
|
FileSystem httpFs = this.getHttpFSFileSystem();
|
||||||
|
|
||||||
|
Map<String, String> dfsErasureCodingCodecs = dfs.getAllErasureCodingCodecs();
|
||||||
|
|
||||||
|
final AtomicReference<Map<String, String>> diffErasureCodingCodecsRef =
|
||||||
|
new AtomicReference<>();
|
||||||
|
LambdaTestUtils.intercept(AssertionError.class, () -> {
|
||||||
|
if (httpFs instanceof HttpFSFileSystem) {
|
||||||
|
HttpFSFileSystem httpFSFileSystem = (HttpFSFileSystem) httpFs;
|
||||||
|
diffErasureCodingCodecsRef.set(httpFSFileSystem.getAllErasureCodingCodecs());
|
||||||
|
} else if (httpFs instanceof WebHdfsFileSystem) {
|
||||||
|
WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) httpFs;
|
||||||
|
diffErasureCodingCodecsRef.set(webHdfsFileSystem.getAllErasureCodingCodecs());
|
||||||
|
} else {
|
||||||
|
throw new AssertionError(httpFs.getClass().getSimpleName() +
|
||||||
|
" is not of type HttpFSFileSystem or WebHdfsFileSystem");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Map<String, String> diffErasureCodingCodecs = diffErasureCodingCodecsRef.get();
|
||||||
|
|
||||||
|
//Validate testGetECCodecs are the same as DistributedFileSystem
|
||||||
|
Assert.assertEquals(dfsErasureCodingCodecs.size(), diffErasureCodingCodecs.size());
|
||||||
|
|
||||||
|
for (Map.Entry<String, String> entry : dfsErasureCodingCodecs.entrySet()) {
|
||||||
|
String key = entry.getKey();
|
||||||
|
String value = entry.getValue();
|
||||||
|
Assert.assertTrue(diffErasureCodingCodecs.containsKey(key));
|
||||||
|
Assert.assertEquals(value, diffErasureCodingCodecs.get(key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void assertHttpFsReportListingWithDfsClient(SnapshotDiffReportListing diffReportListing,
|
private void assertHttpFsReportListingWithDfsClient(SnapshotDiffReportListing diffReportListing,
|
||||||
SnapshotDiffReportListing dfsDiffReportListing) {
|
SnapshotDiffReportListing dfsDiffReportListing) {
|
||||||
Assert.assertEquals(diffReportListing.getCreateList().size(),
|
Assert.assertEquals(diffReportListing.getCreateList().size(),
|
||||||
|
Loading…
Reference in New Issue
Block a user