HDFS-15066. HttpFS: Implement setErasureCodingPolicy , unsetErasureCodingPolicy , getErasureCodingPolicy. Contributed by hemanthboyina.
This commit is contained in:
parent
819159fa06
commit
59aac00283
@ -716,6 +716,9 @@ public static BlockStoragePolicy toBlockStoragePolicy(Map<?, ?> m) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static ErasureCodingPolicy toECPolicy(Map<?, ?> m) {
|
public static ErasureCodingPolicy toECPolicy(Map<?, ?> m) {
|
||||||
|
if (m == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
byte id = ((Number) m.get("id")).byteValue();
|
byte id = ((Number) m.get("id")).byteValue();
|
||||||
String name = (String) m.get("name");
|
String name = (String) m.get("name");
|
||||||
String codec = (String) m.get("codecName");
|
String codec = (String) m.get("codecName");
|
||||||
|
@ -48,6 +48,7 @@
|
|||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
|
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
@ -134,6 +135,7 @@ public class HttpFSFileSystem extends FileSystem
|
|||||||
public static final String SNAPSHOT_NAME_PARAM = "snapshotname";
|
public static final String SNAPSHOT_NAME_PARAM = "snapshotname";
|
||||||
public static final String OLD_SNAPSHOT_NAME_PARAM = "oldsnapshotname";
|
public static final String OLD_SNAPSHOT_NAME_PARAM = "oldsnapshotname";
|
||||||
public static final String FSACTION_MODE_PARAM = "fsaction";
|
public static final String FSACTION_MODE_PARAM = "fsaction";
|
||||||
|
public static final String EC_POLICY_NAME_PARAM = "ecpolicy";
|
||||||
|
|
||||||
public static final Short DEFAULT_PERMISSION = 0755;
|
public static final Short DEFAULT_PERMISSION = 0755;
|
||||||
public static final String ACLSPEC_DEFAULT = "";
|
public static final String ACLSPEC_DEFAULT = "";
|
||||||
@ -260,7 +262,8 @@ public enum Operation {
|
|||||||
CREATESNAPSHOT(HTTP_PUT), DELETESNAPSHOT(HTTP_DELETE),
|
CREATESNAPSHOT(HTTP_PUT), DELETESNAPSHOT(HTTP_DELETE),
|
||||||
RENAMESNAPSHOT(HTTP_PUT), GETSNAPSHOTDIFF(HTTP_GET),
|
RENAMESNAPSHOT(HTTP_PUT), GETSNAPSHOTDIFF(HTTP_GET),
|
||||||
GETSNAPSHOTTABLEDIRECTORYLIST(HTTP_GET), GETSERVERDEFAULTS(HTTP_GET),
|
GETSNAPSHOTTABLEDIRECTORYLIST(HTTP_GET), GETSERVERDEFAULTS(HTTP_GET),
|
||||||
CHECKACCESS(HTTP_GET);
|
CHECKACCESS(HTTP_GET), SETECPOLICY(HTTP_PUT), GETECPOLICY(
|
||||||
|
HTTP_GET), UNSETECPOLICY(HTTP_POST);
|
||||||
|
|
||||||
private String httpMethod;
|
private String httpMethod;
|
||||||
|
|
||||||
@ -1624,4 +1627,33 @@ public void access(final Path path, final FsAction mode) throws IOException {
|
|||||||
getConnection(Operation.CHECKACCESS.getMethod(), params, path, true);
|
getConnection(Operation.CHECKACCESS.getMethod(), params, path, true);
|
||||||
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setErasureCodingPolicy(final Path path, String policyName)
|
||||||
|
throws IOException {
|
||||||
|
Map<String, String> params = new HashMap<String, String>();
|
||||||
|
params.put(OP_PARAM, Operation.SETECPOLICY.toString());
|
||||||
|
params.put(EC_POLICY_NAME_PARAM, policyName);
|
||||||
|
HttpURLConnection conn =
|
||||||
|
getConnection(Operation.SETECPOLICY.getMethod(), params, path, true);
|
||||||
|
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ErasureCodingPolicy getErasureCodingPolicy(final Path path)
|
||||||
|
throws IOException {
|
||||||
|
Map<String, String> params = new HashMap<String, String>();
|
||||||
|
params.put(OP_PARAM, Operation.GETECPOLICY.toString());
|
||||||
|
HttpURLConnection conn =
|
||||||
|
getConnection(Operation.GETECPOLICY.getMethod(), params, path, true);
|
||||||
|
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||||
|
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
||||||
|
return JsonUtilClient.toECPolicy(json);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unsetErasureCodingPolicy(final Path path) throws IOException {
|
||||||
|
Map<String, String> params = new HashMap<String, String>();
|
||||||
|
params.put(OP_PARAM, Operation.UNSETECPOLICY.toString());
|
||||||
|
HttpURLConnection conn =
|
||||||
|
getConnection(Operation.UNSETECPOLICY.getMethod(), params, path, true);
|
||||||
|
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -39,6 +39,7 @@
|
|||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
@ -1901,4 +1902,88 @@ public Void execute(FileSystem fs) throws IOException {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executor that performs a setErasureCodingPolicy operation.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public static class FSSetErasureCodingPolicy
|
||||||
|
implements FileSystemAccess.FileSystemExecutor<Void> {
|
||||||
|
|
||||||
|
private Path path;
|
||||||
|
private String policyName;
|
||||||
|
|
||||||
|
public FSSetErasureCodingPolicy(String path, String policyName) {
|
||||||
|
this.path = new Path(path);
|
||||||
|
this.policyName = policyName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void execute(FileSystem fs) throws IOException {
|
||||||
|
if (fs instanceof DistributedFileSystem) {
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||||
|
dfs.setErasureCodingPolicy(path, policyName);
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedOperationException("setErasureCodingPolicy is "
|
||||||
|
+ "not supported for HttpFs on " + fs.getClass()
|
||||||
|
+ ". Please check your fs.defaultFS configuration");
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executor that performs a getErasureCodingPolicy operation.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public static class FSGetErasureCodingPolicy
|
||||||
|
implements FileSystemAccess.FileSystemExecutor<String> {
|
||||||
|
|
||||||
|
private Path path;
|
||||||
|
|
||||||
|
public FSGetErasureCodingPolicy(String path) {
|
||||||
|
this.path = new Path(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String execute(FileSystem fs) throws IOException {
|
||||||
|
ErasureCodingPolicy policy = null;
|
||||||
|
if (fs instanceof DistributedFileSystem) {
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||||
|
policy = dfs.getErasureCodingPolicy(path);
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedOperationException("getErasureCodingPolicy is "
|
||||||
|
+ "not supported for HttpFs on " + fs.getClass()
|
||||||
|
+ ". Please check your fs.defaultFS configuration");
|
||||||
|
}
|
||||||
|
return JsonUtil.toJsonString(policy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executor that performs a unsetErasureCodingPolicy operation.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public static class FSUnSetErasureCodingPolicy
|
||||||
|
implements FileSystemAccess.FileSystemExecutor<Void> {
|
||||||
|
|
||||||
|
private Path path;
|
||||||
|
|
||||||
|
public FSUnSetErasureCodingPolicy(String path) {
|
||||||
|
this.path = new Path(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void execute(FileSystem fs) throws IOException {
|
||||||
|
if (fs instanceof DistributedFileSystem) {
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||||
|
dfs.unsetErasureCodingPolicy(path);
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedOperationException("unsetErasureCodingPolicy is "
|
||||||
|
+ "not supported for HttpFs on " + fs.getClass()
|
||||||
|
+ ". Please check your fs.defaultFS configuration");
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
@ -118,6 +118,9 @@ public class HttpFSParametersProvider extends ParametersProvider {
|
|||||||
PARAMS_DEF.put(Operation.GETSNAPSHOTTABLEDIRECTORYLIST, new Class[] {});
|
PARAMS_DEF.put(Operation.GETSNAPSHOTTABLEDIRECTORYLIST, new Class[] {});
|
||||||
PARAMS_DEF.put(Operation.GETSERVERDEFAULTS, new Class[] {});
|
PARAMS_DEF.put(Operation.GETSERVERDEFAULTS, new Class[] {});
|
||||||
PARAMS_DEF.put(Operation.CHECKACCESS, new Class[] {FsActionParam.class});
|
PARAMS_DEF.put(Operation.CHECKACCESS, new Class[] {FsActionParam.class});
|
||||||
|
PARAMS_DEF.put(Operation.SETECPOLICY, new Class[] {ECPolicyParam.class});
|
||||||
|
PARAMS_DEF.put(Operation.GETECPOLICY, new Class[] {});
|
||||||
|
PARAMS_DEF.put(Operation.UNSETECPOLICY, new Class[] {});
|
||||||
}
|
}
|
||||||
|
|
||||||
public HttpFSParametersProvider() {
|
public HttpFSParametersProvider() {
|
||||||
@ -694,4 +697,22 @@ public FsActionParam(final String str) {
|
|||||||
super(NAME, str, FSACTION_PATTERN);
|
super(NAME, str, FSACTION_PATTERN);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class for ecpolicy parameter.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public static class ECPolicyParam extends StringParam {
|
||||||
|
/**
|
||||||
|
* Parameter name.
|
||||||
|
*/
|
||||||
|
public static final String NAME = HttpFSFileSystem.EC_POLICY_NAME_PARAM;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
*/
|
||||||
|
public ECPolicyParam() {
|
||||||
|
super(NAME, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.BlockSizeParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.BlockSizeParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DataParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DataParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DestinationParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DestinationParam;
|
||||||
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ECPolicyParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.FilterParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.FilterParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.FsActionParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.FsActionParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.GroupParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.GroupParam;
|
||||||
@ -438,6 +439,14 @@ public InputStream run() throws Exception {
|
|||||||
response = Response.ok().build();
|
response = Response.ok().build();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case GETECPOLICY: {
|
||||||
|
FSOperations.FSGetErasureCodingPolicy command =
|
||||||
|
new FSOperations.FSGetErasureCodingPolicy(path);
|
||||||
|
String js = fsExecute(user, command);
|
||||||
|
AUDIT_LOG.info("[{}]", path);
|
||||||
|
response = Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||||
|
break;
|
||||||
|
}
|
||||||
default: {
|
default: {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value()));
|
MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value()));
|
||||||
@ -601,6 +610,14 @@ public Response post(InputStream is,
|
|||||||
response = Response.ok().build();
|
response = Response.ok().build();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case UNSETECPOLICY: {
|
||||||
|
FSOperations.FSUnSetErasureCodingPolicy command =
|
||||||
|
new FSOperations.FSUnSetErasureCodingPolicy(path);
|
||||||
|
fsExecute(user, command);
|
||||||
|
AUDIT_LOG.info("Unset ec policy [{}]", path);
|
||||||
|
response = Response.ok().build();
|
||||||
|
break;
|
||||||
|
}
|
||||||
default: {
|
default: {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
MessageFormat.format("Invalid HTTP POST operation [{0}]",
|
MessageFormat.format("Invalid HTTP POST operation [{0}]",
|
||||||
@ -885,6 +902,15 @@ public Response put(InputStream is,
|
|||||||
response = Response.ok().build();
|
response = Response.ok().build();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case SETECPOLICY: {
|
||||||
|
String policyName = params.get(ECPolicyParam.NAME, ECPolicyParam.class);
|
||||||
|
FSOperations.FSSetErasureCodingPolicy command =
|
||||||
|
new FSOperations.FSSetErasureCodingPolicy(path, policyName);
|
||||||
|
fsExecute(user, command);
|
||||||
|
AUDIT_LOG.info("[{}] to policy [{}]", path, policyName);
|
||||||
|
response = Response.ok().build();
|
||||||
|
break;
|
||||||
|
}
|
||||||
default: {
|
default: {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
MessageFormat.format("Invalid HTTP PUT operation [{0}]",
|
MessageFormat.format("Invalid HTTP PUT operation [{0}]",
|
||||||
|
@ -44,11 +44,13 @@
|
|||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||||
import org.apache.hadoop.hdfs.web.JsonUtil;
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
@ -1143,7 +1145,7 @@ protected enum Operation {
|
|||||||
CREATE_SNAPSHOT, RENAME_SNAPSHOT, DELETE_SNAPSHOT,
|
CREATE_SNAPSHOT, RENAME_SNAPSHOT, DELETE_SNAPSHOT,
|
||||||
ALLOW_SNAPSHOT, DISALLOW_SNAPSHOT, DISALLOW_SNAPSHOT_EXCEPTION,
|
ALLOW_SNAPSHOT, DISALLOW_SNAPSHOT, DISALLOW_SNAPSHOT_EXCEPTION,
|
||||||
FILE_STATUS_ATTR, GET_SNAPSHOT_DIFF, GET_SNAPSHOTTABLE_DIRECTORY_LIST,
|
FILE_STATUS_ATTR, GET_SNAPSHOT_DIFF, GET_SNAPSHOTTABLE_DIRECTORY_LIST,
|
||||||
GET_SERVERDEFAULTS, CHECKACCESS
|
GET_SERVERDEFAULTS, CHECKACCESS, SETECPOLICY
|
||||||
}
|
}
|
||||||
|
|
||||||
private void operation(Operation op) throws Exception {
|
private void operation(Operation op) throws Exception {
|
||||||
@ -1270,6 +1272,9 @@ private void operation(Operation op) throws Exception {
|
|||||||
case CHECKACCESS:
|
case CHECKACCESS:
|
||||||
testAccess();
|
testAccess();
|
||||||
break;
|
break;
|
||||||
|
case SETECPOLICY:
|
||||||
|
testErasureCodingPolicy();
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -1768,4 +1773,42 @@ private void verifyAccess(FileSystem fs, DistributedFileSystem dfs)
|
|||||||
Assert.fail(fs.getClass().getSimpleName() + " doesn't support access");
|
Assert.fail(fs.getClass().getSimpleName() + " doesn't support access");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testErasureCodingPolicy() throws Exception {
|
||||||
|
if (!this.isLocalFS()) {
|
||||||
|
FileSystem fs = this.getHttpFSFileSystem();
|
||||||
|
Path path1 = new Path("/");
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem) FileSystem
|
||||||
|
.get(path1.toUri(), this.getProxiedFSConf());
|
||||||
|
final String dir = "/xattrTest";
|
||||||
|
Path p1 = new Path(dir);
|
||||||
|
|
||||||
|
final ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies
|
||||||
|
.getByID(SystemErasureCodingPolicies.RS_3_2_POLICY_ID);
|
||||||
|
final String ecPolicyName = ecPolicy.getName();
|
||||||
|
dfs.mkdirs(new Path(dir));
|
||||||
|
dfs.enableErasureCodingPolicy(ecPolicyName);
|
||||||
|
|
||||||
|
if (fs instanceof HttpFSFileSystem) {
|
||||||
|
HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
|
||||||
|
httpFS.setErasureCodingPolicy(p1, ecPolicyName);
|
||||||
|
ErasureCodingPolicy ecPolicy1 = httpFS.getErasureCodingPolicy(p1);
|
||||||
|
assertEquals(ecPolicy, ecPolicy1);
|
||||||
|
httpFS.unsetErasureCodingPolicy(p1);
|
||||||
|
ecPolicy1 = httpFS.getErasureCodingPolicy(p1);
|
||||||
|
Assert.assertNull(ecPolicy1);
|
||||||
|
} else if (fs instanceof WebHdfsFileSystem) {
|
||||||
|
WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
|
||||||
|
webHdfsFileSystem.setErasureCodingPolicy(p1, ecPolicyName);
|
||||||
|
ErasureCodingPolicy ecPolicy1 =
|
||||||
|
webHdfsFileSystem.getErasureCodingPolicy(p1);
|
||||||
|
assertEquals(ecPolicy, ecPolicy1);
|
||||||
|
webHdfsFileSystem.unsetErasureCodingPolicy(p1);
|
||||||
|
ecPolicy1 = dfs.getErasureCodingPolicy(p1);
|
||||||
|
Assert.assertNull(ecPolicy1);
|
||||||
|
} else {
|
||||||
|
Assert.fail(fs.getClass().getSimpleName() + " doesn't support access");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
@ -1738,4 +1738,50 @@ public void testECPolicy() throws Exception {
|
|||||||
(HdfsFileStatus) httpfsWebHdfs.getFileStatus(ecFile);
|
(HdfsFileStatus) httpfsWebHdfs.getFileStatus(ecFile);
|
||||||
assertNotNull(httpfsFileStatus.getErasureCodingPolicy());
|
assertNotNull(httpfsFileStatus.getErasureCodingPolicy());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@TestDir
|
||||||
|
@TestJetty
|
||||||
|
@TestHdfs
|
||||||
|
public void testErasureCodingPolicy() throws Exception {
|
||||||
|
createHttpFSServer(false, false);
|
||||||
|
final String dir = "/ecPolicy";
|
||||||
|
Path path1 = new Path(dir);
|
||||||
|
final ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies
|
||||||
|
.getByID(SystemErasureCodingPolicies.RS_3_2_POLICY_ID);
|
||||||
|
final String ecPolicyName = ecPolicy.getName();
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem) FileSystem
|
||||||
|
.get(path1.toUri(), TestHdfsHelper.getHdfsConf());
|
||||||
|
dfs.mkdirs(new Path(dir));
|
||||||
|
dfs.enableErasureCodingPolicy(ecPolicyName);
|
||||||
|
|
||||||
|
HttpURLConnection conn =
|
||||||
|
putCmdWithReturn(dir, "SETECPOLICY", "ecpolicy=" + ecPolicyName);
|
||||||
|
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||||
|
|
||||||
|
HttpURLConnection conn1 = sendRequestToHttpFSServer(dir, "GETECPOLICY", "");
|
||||||
|
// Should return HTTP_OK
|
||||||
|
Assert.assertEquals(conn1.getResponseCode(), HttpURLConnection.HTTP_OK);
|
||||||
|
// Verify the response
|
||||||
|
BufferedReader reader =
|
||||||
|
new BufferedReader(new InputStreamReader(conn1.getInputStream()));
|
||||||
|
// The response should be a one-line JSON string.
|
||||||
|
String dirLst = reader.readLine();
|
||||||
|
ErasureCodingPolicy dfsDirLst = dfs.getErasureCodingPolicy(path1);
|
||||||
|
Assert.assertNotNull(dfsDirLst);
|
||||||
|
Assert.assertEquals(dirLst, JsonUtil.toJsonString(dfsDirLst));
|
||||||
|
|
||||||
|
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
||||||
|
URL url = new URL(TestJettyHelper.getJettyURL(),
|
||||||
|
MessageFormat.format("/webhdfs/v1{0}?user.name={1}&op={2}&{3}", dir,
|
||||||
|
user, "UNSETECPOLICY", ""));
|
||||||
|
HttpURLConnection conn2 = (HttpURLConnection) url.openConnection();
|
||||||
|
conn2.setRequestMethod("POST");
|
||||||
|
conn2.connect();
|
||||||
|
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn2.getResponseCode());
|
||||||
|
|
||||||
|
// response should be null
|
||||||
|
dfsDirLst = dfs.getErasureCodingPolicy(path1);
|
||||||
|
Assert.assertNull(dfsDirLst);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user