HDFS-15190. HttpFS: Add Support for Storage Policy Satisfier. Contributed by hemanthboyina.
This commit is contained in:
parent
e36b272608
commit
9a3a28018a
@ -263,7 +263,7 @@ public enum Operation {
|
|||||||
RENAMESNAPSHOT(HTTP_PUT), GETSNAPSHOTDIFF(HTTP_GET),
|
RENAMESNAPSHOT(HTTP_PUT), GETSNAPSHOTDIFF(HTTP_GET),
|
||||||
GETSNAPSHOTTABLEDIRECTORYLIST(HTTP_GET), GETSERVERDEFAULTS(HTTP_GET),
|
GETSNAPSHOTTABLEDIRECTORYLIST(HTTP_GET), GETSERVERDEFAULTS(HTTP_GET),
|
||||||
CHECKACCESS(HTTP_GET), SETECPOLICY(HTTP_PUT), GETECPOLICY(
|
CHECKACCESS(HTTP_GET), SETECPOLICY(HTTP_PUT), GETECPOLICY(
|
||||||
HTTP_GET), UNSETECPOLICY(HTTP_POST);
|
HTTP_GET), UNSETECPOLICY(HTTP_POST), SATISFYSTORAGEPOLICY(HTTP_PUT);
|
||||||
|
|
||||||
private String httpMethod;
|
private String httpMethod;
|
||||||
|
|
||||||
@ -1658,4 +1658,13 @@ public void unsetErasureCodingPolicy(final Path path) throws IOException {
|
|||||||
getConnection(Operation.UNSETECPOLICY.getMethod(), params, path, true);
|
getConnection(Operation.UNSETECPOLICY.getMethod(), params, path, true);
|
||||||
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void satisfyStoragePolicy(final Path path) throws IOException {
|
||||||
|
Map<String, String> params = new HashMap<String, String>();
|
||||||
|
params.put(OP_PARAM, Operation.SATISFYSTORAGEPOLICY.toString());
|
||||||
|
HttpURLConnection conn = getConnection(
|
||||||
|
Operation.SATISFYSTORAGEPOLICY.getMethod(), params, path, true);
|
||||||
|
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1988,4 +1988,31 @@ public Void execute(FileSystem fs) throws IOException {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executor that performs a satisfyStoragePolicy operation.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public static class FSSatisyStoragePolicy
|
||||||
|
implements FileSystemAccess.FileSystemExecutor<Void> {
|
||||||
|
|
||||||
|
private Path path;
|
||||||
|
|
||||||
|
public FSSatisyStoragePolicy(String path) {
|
||||||
|
this.path = new Path(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void execute(FileSystem fs) throws IOException {
|
||||||
|
if (fs instanceof DistributedFileSystem) {
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||||
|
dfs.satisfyStoragePolicy(path);
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedOperationException("satisfyStoragePolicy is "
|
||||||
|
+ "not supported for HttpFs on " + fs.getClass()
|
||||||
|
+ ". Please check your fs.defaultFS configuration");
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
@ -121,6 +121,7 @@ public class HttpFSParametersProvider extends ParametersProvider {
|
|||||||
PARAMS_DEF.put(Operation.SETECPOLICY, new Class[] {ECPolicyParam.class});
|
PARAMS_DEF.put(Operation.SETECPOLICY, new Class[] {ECPolicyParam.class});
|
||||||
PARAMS_DEF.put(Operation.GETECPOLICY, new Class[] {});
|
PARAMS_DEF.put(Operation.GETECPOLICY, new Class[] {});
|
||||||
PARAMS_DEF.put(Operation.UNSETECPOLICY, new Class[] {});
|
PARAMS_DEF.put(Operation.UNSETECPOLICY, new Class[] {});
|
||||||
|
PARAMS_DEF.put(Operation.SATISFYSTORAGEPOLICY, new Class[] {});
|
||||||
}
|
}
|
||||||
|
|
||||||
public HttpFSParametersProvider() {
|
public HttpFSParametersProvider() {
|
||||||
|
@ -957,7 +957,15 @@ public Response put(InputStream is,
|
|||||||
AUDIT_LOG.info("[{}] to policy [{}]", path, policyName);
|
AUDIT_LOG.info("[{}] to policy [{}]", path, policyName);
|
||||||
response = Response.ok().build();
|
response = Response.ok().build();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case SATISFYSTORAGEPOLICY: {
|
||||||
|
FSOperations.FSSatisyStoragePolicy command =
|
||||||
|
new FSOperations.FSSatisyStoragePolicy(path);
|
||||||
|
fsExecute(user, command);
|
||||||
|
AUDIT_LOG.info("satisfy storage policy for [{}]", path);
|
||||||
|
response = Response.ok().build();
|
||||||
|
break;
|
||||||
|
}
|
||||||
default: {
|
default: {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
MessageFormat.format("Invalid HTTP PUT operation [{0}]",
|
MessageFormat.format("Invalid HTTP PUT operation [{0}]",
|
||||||
|
@ -44,6 +44,7 @@
|
|||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
@ -51,6 +52,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.web.JsonUtil;
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
@ -1146,7 +1148,7 @@ protected enum Operation {
|
|||||||
CREATE_SNAPSHOT, RENAME_SNAPSHOT, DELETE_SNAPSHOT,
|
CREATE_SNAPSHOT, RENAME_SNAPSHOT, DELETE_SNAPSHOT,
|
||||||
ALLOW_SNAPSHOT, DISALLOW_SNAPSHOT, DISALLOW_SNAPSHOT_EXCEPTION,
|
ALLOW_SNAPSHOT, DISALLOW_SNAPSHOT, DISALLOW_SNAPSHOT_EXCEPTION,
|
||||||
FILE_STATUS_ATTR, GET_SNAPSHOT_DIFF, GET_SNAPSHOTTABLE_DIRECTORY_LIST,
|
FILE_STATUS_ATTR, GET_SNAPSHOT_DIFF, GET_SNAPSHOTTABLE_DIRECTORY_LIST,
|
||||||
GET_SERVERDEFAULTS, CHECKACCESS, SETECPOLICY
|
GET_SERVERDEFAULTS, CHECKACCESS, SETECPOLICY, SATISFYSTORAGEPOLICY
|
||||||
}
|
}
|
||||||
|
|
||||||
private void operation(Operation op) throws Exception {
|
private void operation(Operation op) throws Exception {
|
||||||
@ -1276,6 +1278,9 @@ private void operation(Operation op) throws Exception {
|
|||||||
case SETECPOLICY:
|
case SETECPOLICY:
|
||||||
testErasureCodingPolicy();
|
testErasureCodingPolicy();
|
||||||
break;
|
break;
|
||||||
|
case SATISFYSTORAGEPOLICY:
|
||||||
|
testStoragePolicySatisfier();
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -1812,4 +1817,40 @@ private void testErasureCodingPolicy() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testStoragePolicySatisfier() throws Exception {
|
||||||
|
final String dir = "/parent";
|
||||||
|
Path path1 = new Path(dir);
|
||||||
|
String file = "/parent/file";
|
||||||
|
Path filePath = new Path(file);
|
||||||
|
if (!this.isLocalFS()) {
|
||||||
|
FileSystem fs = this.getHttpFSFileSystem();
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem) FileSystem
|
||||||
|
.get(path1.toUri(), this.getProxiedFSConf());
|
||||||
|
dfs.mkdirs(path1);
|
||||||
|
dfs.create(filePath).close();
|
||||||
|
dfs.setStoragePolicy(filePath, HdfsConstants.COLD_STORAGE_POLICY_NAME);
|
||||||
|
BlockStoragePolicy storagePolicy =
|
||||||
|
(BlockStoragePolicy) dfs.getStoragePolicy(filePath);
|
||||||
|
assertEquals(HdfsConstants.COLD_STORAGE_POLICY_NAME,
|
||||||
|
storagePolicy.getName());
|
||||||
|
Map<String, byte[]> xAttrs;
|
||||||
|
if (fs instanceof HttpFSFileSystem) {
|
||||||
|
HttpFSFileSystem httpFS = (HttpFSFileSystem) fs;
|
||||||
|
httpFS.satisfyStoragePolicy(path1);
|
||||||
|
xAttrs = httpFS.getXAttrs(path1);
|
||||||
|
assertTrue(xAttrs
|
||||||
|
.containsKey(HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY));
|
||||||
|
} else if (fs instanceof WebHdfsFileSystem) {
|
||||||
|
WebHdfsFileSystem webHdfsFileSystem = (WebHdfsFileSystem) fs;
|
||||||
|
webHdfsFileSystem.satisfyStoragePolicy(path1);
|
||||||
|
xAttrs = webHdfsFileSystem.getXAttrs(path1);
|
||||||
|
assertTrue(xAttrs
|
||||||
|
.containsKey(HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY));
|
||||||
|
} else {
|
||||||
|
Assert.fail(fs.getClass().getSimpleName() + " doesn't support access");
|
||||||
|
}
|
||||||
|
dfs.delete(path1, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
@ -19,16 +19,21 @@
|
|||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.web.JsonUtil;
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||||
import org.apache.hadoop.lib.service.FileSystemAccess;
|
import org.apache.hadoop.lib.service.FileSystemAccess;
|
||||||
import org.apache.hadoop.security.authentication.util.SignerSecretProvider;
|
import org.apache.hadoop.security.authentication.util.SignerSecretProvider;
|
||||||
@ -186,6 +191,8 @@ private Configuration createHttpFSConf(boolean addDelegationTokenAuthHandler,
|
|||||||
Configuration conf = new Configuration(hdfsConf);
|
Configuration conf = new Configuration(hdfsConf);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
|
||||||
|
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
|
||||||
|
StoragePolicySatisfierMode.EXTERNAL.toString());
|
||||||
File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
|
File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
|
||||||
OutputStream os = new FileOutputStream(hdfsSite);
|
OutputStream os = new FileOutputStream(hdfsSite);
|
||||||
conf.writeXml(os);
|
conf.writeXml(os);
|
||||||
@ -1801,4 +1808,30 @@ public void testErasureCodingPolicy() throws Exception {
|
|||||||
conn4.connect();
|
conn4.connect();
|
||||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn4.getResponseCode());
|
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn4.getResponseCode());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@TestDir
|
||||||
|
@TestJetty
|
||||||
|
@TestHdfs
|
||||||
|
public void testStoragePolicySatisfier() throws Exception {
|
||||||
|
createHttpFSServer(false, false);
|
||||||
|
final String dir = "/parent";
|
||||||
|
Path path1 = new Path(dir);
|
||||||
|
String file = "/parent/file";
|
||||||
|
Path filePath = new Path(file);
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem) FileSystem
|
||||||
|
.get(path1.toUri(), TestHdfsHelper.getHdfsConf());
|
||||||
|
dfs.mkdirs(path1);
|
||||||
|
dfs.create(filePath).close();
|
||||||
|
dfs.setStoragePolicy(filePath, HdfsConstants.COLD_STORAGE_POLICY_NAME);
|
||||||
|
BlockStoragePolicy storagePolicy =
|
||||||
|
(BlockStoragePolicy) dfs.getStoragePolicy(filePath);
|
||||||
|
assertEquals(HdfsConstants.COLD_STORAGE_POLICY_NAME,
|
||||||
|
storagePolicy.getName());
|
||||||
|
HttpURLConnection conn = putCmdWithReturn(dir, "SATISFYSTORAGEPOLICY", "");
|
||||||
|
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||||
|
Map<String, byte[]> xAttrs = dfs.getXAttrs(path1);
|
||||||
|
assertTrue(
|
||||||
|
xAttrs.containsKey(HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runners.model.FrameworkMethod;
|
import org.junit.runners.model.FrameworkMethod;
|
||||||
import org.junit.runners.model.Statement;
|
import org.junit.runners.model.Statement;
|
||||||
@ -174,6 +175,8 @@ private static synchronized MiniDFSCluster startMiniHdfs(Configuration conf) thr
|
|||||||
conf.set("hadoop.security.authentication", "simple");
|
conf.set("hadoop.security.authentication", "simple");
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
|
||||||
|
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
|
||||||
|
StoragePolicySatisfierMode.EXTERNAL.toString());
|
||||||
// For BaseTestHttpFSWith#testFileAclsCustomizedUserAndGroupNames
|
// For BaseTestHttpFSWith#testFileAclsCustomizedUserAndGroupNames
|
||||||
conf.set(HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
|
conf.set(HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
|
||||||
"^[A-Za-z0-9_][A-Za-z0-9._-]*[$]?$");
|
"^[A-Za-z0-9_][A-Za-z0-9._-]*[$]?$");
|
||||||
|
Loading…
Reference in New Issue
Block a user