HDFS-14064. WEBHDFS: Support Enable/Disable EC Policy. Contributed by Ayush Saxena.
This commit is contained in:
parent
c4d0ef6946
commit
892b33e054
@ -1310,6 +1310,16 @@ public void allowSnapshot(final Path p) throws IOException {
|
||||
new FsPathRunner(op, p).run();
|
||||
}
|
||||
|
||||
public void enableECPolicy(String policyName) throws IOException {
|
||||
final HttpOpParam.Op op = PutOpParam.Op.ENABLEECPOLICY;
|
||||
new FsPathRunner(op, null, new ECPolicyParam(policyName)).run();
|
||||
}
|
||||
|
||||
public void disableECPolicy(String policyName) throws IOException {
|
||||
final HttpOpParam.Op op = PutOpParam.Op.DISABLEECPOLICY;
|
||||
new FsPathRunner(op, null, new ECPolicyParam(policyName)).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path createSnapshot(final Path path, final String snapshotName)
|
||||
throws IOException {
|
||||
|
@ -0,0 +1,42 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.resources;
|
||||
|
||||
/** policy parameter. */
|
||||
public class ECPolicyParam extends StringParam {
|
||||
/** Parameter name. */
|
||||
public static final String NAME = "ecpolicy";
|
||||
/** Default parameter value. */
|
||||
public static final String DEFAULT = "";
|
||||
|
||||
private static final Domain DOMAIN = new Domain(NAME, null);
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param str a string representation of the parameter value.
|
||||
*/
|
||||
public ECPolicyParam(final String str) {
|
||||
super(DOMAIN, str == null || str.equals(DEFAULT) ? null : str);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
}
|
@ -46,6 +46,9 @@ public enum Op implements HttpOpParam.Op {
|
||||
SETXATTR(false, HttpURLConnection.HTTP_OK),
|
||||
REMOVEXATTR(false, HttpURLConnection.HTTP_OK),
|
||||
|
||||
ENABLEECPOLICY(false, HttpURLConnection.HTTP_OK),
|
||||
DISABLEECPOLICY(false, HttpURLConnection.HTTP_OK),
|
||||
|
||||
ALLOWSNAPSHOT(false, HttpURLConnection.HTTP_OK),
|
||||
DISALLOWSNAPSHOT(false, HttpURLConnection.HTTP_OK),
|
||||
CREATESNAPSHOT(false, HttpURLConnection.HTTP_OK),
|
||||
|
@ -54,6 +54,7 @@
|
||||
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.ECPolicyParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.FsActionParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||
@ -215,7 +216,8 @@ protected Response put(
|
||||
final ExcludeDatanodesParam exclDatanodes,
|
||||
final CreateFlagParam createFlagParam,
|
||||
final NoRedirectParam noredirectParam,
|
||||
final StoragePolicyParam policyName
|
||||
final StoragePolicyParam policyName,
|
||||
final ECPolicyParam ecpolicy
|
||||
) throws IOException, URISyntaxException {
|
||||
|
||||
switch(op.getValue()) {
|
||||
@ -252,6 +254,8 @@ protected Response put(
|
||||
case RENAMESNAPSHOT:
|
||||
case DISALLOWSNAPSHOT:
|
||||
case SETSTORAGEPOLICY:
|
||||
case ENABLEECPOLICY:
|
||||
case DISABLEECPOLICY:
|
||||
{
|
||||
// Whitelist operations that can handled by NamenodeWebHdfsMethods
|
||||
return super.put(ugi, delegation, username, doAsUser, fullpath, op,
|
||||
@ -260,7 +264,7 @@ protected Response put(
|
||||
accessTime, renameOptions, createParent, delegationTokenArgument,
|
||||
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
|
||||
oldSnapshotName, exclDatanodes, createFlagParam, noredirectParam,
|
||||
policyName);
|
||||
policyName, ecpolicy);
|
||||
}
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
|
@ -492,14 +492,16 @@ public Response putRoot(
|
||||
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
|
||||
final NoRedirectParam noredirect,
|
||||
@QueryParam(StoragePolicyParam.NAME) @DefaultValue(StoragePolicyParam
|
||||
.DEFAULT) final StoragePolicyParam policyName
|
||||
.DEFAULT) final StoragePolicyParam policyName,
|
||||
@QueryParam(ECPolicyParam.NAME) @DefaultValue(ECPolicyParam
|
||||
.DEFAULT) final ECPolicyParam ecpolicy
|
||||
) throws IOException, InterruptedException {
|
||||
return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
|
||||
owner, group, permission, unmaskedPermission, overwrite, bufferSize,
|
||||
replication, blockSize, modificationTime, accessTime, renameOptions,
|
||||
createParent, delegationTokenArgument, aclPermission, xattrName,
|
||||
xattrValue, xattrSetFlag, snapshotName, oldSnapshotName,
|
||||
excludeDatanodes, createFlagParam, noredirect, policyName);
|
||||
excludeDatanodes, createFlagParam, noredirect, policyName, ecpolicy);
|
||||
}
|
||||
|
||||
/** Validate all required params. */
|
||||
@ -579,7 +581,9 @@ public Response put(
|
||||
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
|
||||
final NoRedirectParam noredirect,
|
||||
@QueryParam(StoragePolicyParam.NAME) @DefaultValue(StoragePolicyParam
|
||||
.DEFAULT) final StoragePolicyParam policyName
|
||||
.DEFAULT) final StoragePolicyParam policyName,
|
||||
@QueryParam(ECPolicyParam.NAME) @DefaultValue(ECPolicyParam.DEFAULT)
|
||||
final ECPolicyParam ecpolicy
|
||||
) throws IOException, InterruptedException {
|
||||
|
||||
init(ugi, delegation, username, doAsUser, path, op, destination, owner,
|
||||
@ -599,7 +603,7 @@ public Response run() throws IOException, URISyntaxException {
|
||||
renameOptions, createParent, delegationTokenArgument,
|
||||
aclPermission, xattrName, xattrValue, xattrSetFlag,
|
||||
snapshotName, oldSnapshotName, excludeDatanodes,
|
||||
createFlagParam, noredirect, policyName);
|
||||
createFlagParam, noredirect, policyName, ecpolicy);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -634,7 +638,8 @@ protected Response put(
|
||||
final ExcludeDatanodesParam exclDatanodes,
|
||||
final CreateFlagParam createFlagParam,
|
||||
final NoRedirectParam noredirectParam,
|
||||
final StoragePolicyParam policyName
|
||||
final StoragePolicyParam policyName,
|
||||
final ECPolicyParam ecpolicy
|
||||
) throws IOException, URISyntaxException {
|
||||
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
|
||||
final ClientProtocol cp = getRpcClientProtocol();
|
||||
@ -795,6 +800,16 @@ protected Response put(
|
||||
cp.setStoragePolicy(fullpath, policyName.getValue());
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case ENABLEECPOLICY:
|
||||
validateOpParams(op, ecpolicy);
|
||||
cp.enableErasureCodingPolicy(ecpolicy.getValue());
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
|
||||
case DISABLEECPOLICY:
|
||||
validateOpParams(op, ecpolicy);
|
||||
cp.disableErasureCodingPolicy(ecpolicy.getValue());
|
||||
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
|
@ -69,6 +69,8 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop
|
||||
* [`SETXATTR`](#Set_XAttr) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).setXAttr)
|
||||
* [`REMOVEXATTR`](#Remove_XAttr) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).removeXAttr)
|
||||
* [`SETSTORAGEPOLICY`](#Set_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).setStoragePolicy)
|
||||
* [`ENABLEECPOLICY`](#Enable_EC_Policy) (see [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).enablePolicy)
|
||||
* [`DISABLEECPOLICY`](#Disable_EC_Policy) (see [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).disablePolicy)
|
||||
* HTTP POST
|
||||
* [`APPEND`](#Append_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).append)
|
||||
* [`CONCAT`](#Concat_Files) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).concat)
|
||||
@ -1266,6 +1268,37 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getXAttrs
|
||||
|
||||
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).listXAttrs
|
||||
|
||||
Erasure Coding Operations
|
||||
-------------------------
|
||||
|
||||
### Enable EC Policy
|
||||
|
||||
* Submit a HTTP PUT request.
|
||||
|
||||
curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/?op=ENABLEECPOLICY
|
||||
&ecpolicy=<policy>"
|
||||
|
||||
The client receives a response with zero content length:
|
||||
|
||||
HTTP/1.1 200 OK
|
||||
Content-Length: 0
|
||||
|
||||
See also: [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).enablePolicy)
|
||||
|
||||
### Disable EC Policy
|
||||
|
||||
* Submit a HTTP PUT request.
|
||||
|
||||
curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/?op=DISABLEECPOLICY
|
||||
&ecpolicy=<policy>"
|
||||
|
||||
The client receives a response with zero content length:
|
||||
|
||||
HTTP/1.1 200 OK
|
||||
Content-Length: 0
|
||||
|
||||
See also: [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).disablePolicy)
|
||||
|
||||
Snapshot Operations
|
||||
-------------------
|
||||
|
||||
@ -2822,6 +2855,18 @@ See also: [Create and Write to a File](#Create_and_Write_to_a_File)
|
||||
|
||||
See also: [`SETSTORAGEPOLICY`](#Set_Storage_Policy)
|
||||
|
||||
### Erasure Coding Policy
|
||||
|
||||
| Name | `ecpolicy` |
|
||||
|:---- |:---- |
|
||||
| Description | The name of the erasure coding policy. |
|
||||
| Type | String |
|
||||
| Default Value | \<empty\> |
|
||||
| Valid Values | Any valid erasure coding policy name; |
|
||||
| Syntax | Any string. |
|
||||
|
||||
See also: [`ENABLEECPOLICY`](#Enable_EC_Policy) or [`DISABLEECPOLICY`](#Disable_EC_Policy)
|
||||
|
||||
### Start After
|
||||
|
||||
| Name | `startAfter` |
|
||||
|
@ -47,6 +47,8 @@
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
@ -83,6 +85,7 @@
|
||||
import org.apache.hadoop.hdfs.TestFileCreation;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
@ -1599,6 +1602,49 @@ public void testSetStoragePolicyWhenPolicyDisabled() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
private void checkECPolicyState(Collection<ErasureCodingPolicyInfo> policies,
|
||||
String ecpolicy, String state) {
|
||||
Iterator<ErasureCodingPolicyInfo> itr = policies.iterator();
|
||||
boolean found = false;
|
||||
while (policies.iterator().hasNext()) {
|
||||
ErasureCodingPolicyInfo policy = itr.next();
|
||||
if (policy.getPolicy().getName().equals(ecpolicy)) {
|
||||
found = true;
|
||||
if (state.equals("disable")) {
|
||||
Assert.assertTrue(policy.isDisabled());
|
||||
} else if (state.equals("enable")) {
|
||||
Assert.assertTrue(policy.isEnabled());
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
Assert.assertTrue(found);
|
||||
}
|
||||
|
||||
// Test For Enable/Disable EC Policy in DFS.
|
||||
@Test
|
||||
public void testEnableDisableECPolicy() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
try (MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(0).build()) {
|
||||
cluster.waitActive();
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil
|
||||
.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
String policy = "RS-10-4-1024k";
|
||||
|
||||
// Check for Enable EC policy via WEBHDFS.
|
||||
dfs.disableErasureCodingPolicy(policy);
|
||||
checkECPolicyState(dfs.getAllErasureCodingPolicies(), policy, "disable");
|
||||
webHdfs.enableECPolicy("RS-10-4-1024k");
|
||||
checkECPolicyState(dfs.getAllErasureCodingPolicies(), policy, "enable");
|
||||
|
||||
// Check for Disable EC policy via WEBHDFS.
|
||||
webHdfs.disableECPolicy(policy);
|
||||
checkECPolicyState(dfs.getAllErasureCodingPolicies(), policy, "disable");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWebHdfsAppend() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
|
@ -505,6 +505,14 @@ public void testStoragePolicyParam() {
|
||||
Assert.assertEquals("COLD", p.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testECPolicyParam() {
|
||||
ECPolicyParam p = new ECPolicyParam(ECPolicyParam.DEFAULT);
|
||||
Assert.assertEquals(null, p.getValue());
|
||||
p = new ECPolicyParam("RS-6-3-1024k");
|
||||
Assert.assertEquals("RS-6-3-1024k", p.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHttpOpParams() {
|
||||
try {
|
||||
|
Loading…
Reference in New Issue
Block a user