HDFS-14124. EC : Support EC Commands (set/get/unset EcPolicy) via WebHdfs. Contributed by Ayush Saxena.

This commit is contained in:
Vinayakumar B 2018-12-11 17:59:04 +05:30
parent 3ff8580f22
commit 39dc7345b8
8 changed files with 164 additions and 7 deletions

View File

@ -675,6 +675,17 @@ public static BlockStoragePolicy toBlockStoragePolicy(Map<?, ?> m) {
replicationFallbacks, copyOnCreateFile.booleanValue()); replicationFallbacks, copyOnCreateFile.booleanValue());
} }
public static ErasureCodingPolicy toECPolicy(Map<?, ?> m) {
byte id = ((Number) m.get("id")).byteValue();
String name = (String) m.get("name");
String codec = (String) m.get("codecName");
int cellsize = ((Number) m.get("cellSize")).intValue();
int dataunits = ((Number) m.get("numDataUnits")).intValue();
int parityunits = ((Number) m.get("numParityUnits")).intValue();
ECSchema ecs = new ECSchema(codec, dataunits, parityunits);
return new ErasureCodingPolicy(name, ecs, cellsize, id);
}
private static StorageType[] toStorageTypes(List<?> list) { private static StorageType[] toStorageTypes(List<?> list) {
if (list == null) { if (list == null) {
return null; return null;

View File

@ -94,6 +94,7 @@
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@ -1311,15 +1312,47 @@ public void allowSnapshot(final Path p) throws IOException {
} }
public void enableECPolicy(String policyName) throws IOException { public void enableECPolicy(String policyName) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.ENABLE_EC_POLICY);
final HttpOpParam.Op op = PutOpParam.Op.ENABLEECPOLICY; final HttpOpParam.Op op = PutOpParam.Op.ENABLEECPOLICY;
new FsPathRunner(op, null, new ECPolicyParam(policyName)).run(); new FsPathRunner(op, null, new ECPolicyParam(policyName)).run();
} }
public void disableECPolicy(String policyName) throws IOException { public void disableECPolicy(String policyName) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.DISABLE_EC_POLICY);
final HttpOpParam.Op op = PutOpParam.Op.DISABLEECPOLICY; final HttpOpParam.Op op = PutOpParam.Op.DISABLEECPOLICY;
new FsPathRunner(op, null, new ECPolicyParam(policyName)).run(); new FsPathRunner(op, null, new ECPolicyParam(policyName)).run();
} }
public void setErasureCodingPolicy(Path p, String policyName)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_EC_POLICY);
final HttpOpParam.Op op = PutOpParam.Op.SETECPOLICY;
new FsPathRunner(op, p, new ECPolicyParam(policyName)).run();
}
public void unsetErasureCodingPolicy(Path p) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.UNSET_EC_POLICY);
final HttpOpParam.Op op = PostOpParam.Op.UNSETECPOLICY;
new FsPathRunner(op, p).run();
}
public ErasureCodingPolicy getErasureCodingPolicy(Path p)
throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_EC_POLICY);
final HttpOpParam.Op op =GetOpParam.Op.GETECPOLICY;
return new FsPathResponseRunner<ErasureCodingPolicy>(op, p) {
@Override
ErasureCodingPolicy decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.toECPolicy((Map<?, ?>) json);
}
}.run();
}
@Override @Override
public Path createSnapshot(final Path path, final String snapshotName) public Path createSnapshot(final Path path, final String snapshotName)
throws IOException { throws IOException {

View File

@ -53,6 +53,8 @@ public enum Op implements HttpOpParam.Op {
GETALLSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK), GETALLSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK),
GETSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK), GETSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK),
GETECPOLICY(false, HttpURLConnection.HTTP_OK),
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED), NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED),
CHECKACCESS(false, HttpURLConnection.HTTP_OK), CHECKACCESS(false, HttpURLConnection.HTTP_OK),

View File

@ -29,6 +29,8 @@ public enum Op implements HttpOpParam.Op {
TRUNCATE(false, HttpURLConnection.HTTP_OK), TRUNCATE(false, HttpURLConnection.HTTP_OK),
UNSETECPOLICY(false, HttpURLConnection.HTTP_OK),
UNSETSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK), UNSETSTORAGEPOLICY(false, HttpURLConnection.HTTP_OK),
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED); NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);

View File

@ -48,6 +48,7 @@ public enum Op implements HttpOpParam.Op {
ENABLEECPOLICY(false, HttpURLConnection.HTTP_OK), ENABLEECPOLICY(false, HttpURLConnection.HTTP_OK),
DISABLEECPOLICY(false, HttpURLConnection.HTTP_OK), DISABLEECPOLICY(false, HttpURLConnection.HTTP_OK),
SETECPOLICY(false, HttpURLConnection.HTTP_OK),
ALLOWSNAPSHOT(false, HttpURLConnection.HTTP_OK), ALLOWSNAPSHOT(false, HttpURLConnection.HTTP_OK),
DISALLOWSNAPSHOT(false, HttpURLConnection.HTTP_OK), DISALLOWSNAPSHOT(false, HttpURLConnection.HTTP_OK),

View File

@ -79,6 +79,7 @@
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@ -804,12 +805,14 @@ protected Response put(
validateOpParams(op, ecpolicy); validateOpParams(op, ecpolicy);
cp.enableErasureCodingPolicy(ecpolicy.getValue()); cp.enableErasureCodingPolicy(ecpolicy.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build(); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
case DISABLEECPOLICY: case DISABLEECPOLICY:
validateOpParams(op, ecpolicy); validateOpParams(op, ecpolicy);
cp.disableErasureCodingPolicy(ecpolicy.getValue()); cp.disableErasureCodingPolicy(ecpolicy.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build(); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
case SETECPOLICY:
validateOpParams(op, ecpolicy);
cp.setErasureCodingPolicy(fullpath, ecpolicy.getValue());
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
default: default:
throw new UnsupportedOperationException(op + " is not supported"); throw new UnsupportedOperationException(op + " is not supported");
} }
@ -937,6 +940,9 @@ protected Response post(
cp.unsetStoragePolicy(fullpath); cp.unsetStoragePolicy(fullpath);
return Response.ok().build(); return Response.ok().build();
} }
case UNSETECPOLICY:
cp.unsetErasureCodingPolicy(fullpath);
return Response.ok().build();
default: default:
throw new UnsupportedOperationException(op + " is not supported"); throw new UnsupportedOperationException(op + " is not supported");
} }
@ -1247,6 +1253,11 @@ protected Response get(
final String js = JsonUtil.toJsonString(storagePolicy); final String js = JsonUtil.toJsonString(storagePolicy);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
} }
case GETECPOLICY: {
ErasureCodingPolicy ecpolicy = cp.getErasureCodingPolicy(fullpath);
final String js = JsonUtil.toJsonString(ecpolicy);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GETSERVERDEFAULTS: { case GETSERVERDEFAULTS: {
// Since none of the server defaults values are hot reloaded, we can // Since none of the server defaults values are hot reloaded, we can
// cache the output of serverDefaults. // cache the output of serverDefaults.

View File

@ -53,6 +53,7 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop
* [`GETSNAPSHOTDIFF`](#Get_Snapshot_Diff) * [`GETSNAPSHOTDIFF`](#Get_Snapshot_Diff)
* [`GETSNAPSHOTTABLEDIRECTORYLIST`](#Get_Snapshottable_Directory_List) * [`GETSNAPSHOTTABLEDIRECTORYLIST`](#Get_Snapshottable_Directory_List)
* [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations) * [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations)
* [`GETECPOLICY`](#Get_EC_Policy) (see [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).getErasureCodingPolicy)
* HTTP PUT * HTTP PUT
* [`CREATE`](#Create_and_Write_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create) * [`CREATE`](#Create_and_Write_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create)
* [`MKDIRS`](#Make_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs) * [`MKDIRS`](#Make_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs)
@ -73,11 +74,13 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop
* [`SETSTORAGEPOLICY`](#Set_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).setStoragePolicy) * [`SETSTORAGEPOLICY`](#Set_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).setStoragePolicy)
* [`ENABLEECPOLICY`](#Enable_EC_Policy) (see [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).enablePolicy) * [`ENABLEECPOLICY`](#Enable_EC_Policy) (see [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).enablePolicy)
* [`DISABLEECPOLICY`](#Disable_EC_Policy) (see [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).disablePolicy) * [`DISABLEECPOLICY`](#Disable_EC_Policy) (see [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).disablePolicy)
* [`SETECPOLICY`](#Set_EC_Policy) (see [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).setErasureCodingPolicy)
* HTTP POST * HTTP POST
* [`APPEND`](#Append_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).append) * [`APPEND`](#Append_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).append)
* [`CONCAT`](#Concat_Files) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).concat) * [`CONCAT`](#Concat_Files) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).concat)
* [`TRUNCATE`](#Truncate_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).truncate) * [`TRUNCATE`](#Truncate_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).truncate)
* [`UNSETSTORAGEPOLICY`](#Unset_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStoragePolicy) * [`UNSETSTORAGEPOLICY`](#Unset_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStoragePolicy)
* [`UNSETECPOLICY`](#Unset_EC_Policy) (see [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).unsetErasureCodingPolicy)
* HTTP DELETE * HTTP DELETE
* [`DELETE`](#Delete_a_FileDirectory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).delete) * [`DELETE`](#Delete_a_FileDirectory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).delete)
* [`DELETESNAPSHOT`](#Delete_Snapshot) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).deleteSnapshot) * [`DELETESNAPSHOT`](#Delete_Snapshot) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).deleteSnapshot)
@ -1285,7 +1288,7 @@ Erasure Coding Operations
HTTP/1.1 200 OK HTTP/1.1 200 OK
Content-Length: 0 Content-Length: 0
See also: [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).enablePolicy) See also: [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).enablePolicy
### Disable EC Policy ### Disable EC Policy
@ -1299,7 +1302,68 @@ See also: [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).
HTTP/1.1 200 OK HTTP/1.1 200 OK
Content-Length: 0 Content-Length: 0
See also: [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).disablePolicy) See also: [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).disablePolicy
### Set EC Policy
* Submit a HTTP PUT request.
curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETECPOLICY
&ecpolicy=<policy>"
The client receives a response with zero content length:
HTTP/1.1 200 OK
Content-Length: 0
See also: [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).setErasureCodingPolicy
### Get EC Policy
* Submit a HTTP GET request.
curl -i -X GET "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETECPOLICY
"
The client receives a response with a [`ECPolicy` JSON object](#ECPolicy_JSON_Schema):
{
"name": "RS-10-4-1024k",
"schema":
{
"codecName": "rs",
"numDataUnits": 10,
"numParityUnits": 4,
"extraOptions": {}
}
"cellSize": 1048576,
"id":5,
"codecname":"rs",
"numDataUnits": 10,
"numParityUnits": 4,
"replicationpolicy":false,
"systemPolicy":true
}
See also: [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).getErasureCodingPolicy
### Unset EC Policy
* Submit a HTTP POST request.
curl -i -X POST "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=UNSETECPOLICY
"
The client receives a response with zero content length:
HTTP/1.1 200 OK
Content-Length: 0
See also: [HDFSErasureCoding](./HDFSErasureCoding.html#Administrative_commands).unsetErasureCodingPolicy
Snapshot Operations Snapshot Operations
------------------- -------------------
@ -2173,6 +2237,26 @@ var blockStoragePolicyProperties =
} }
}; };
``` ```
### ECPolicy JSON Schema
```json
{
"name": "RS-10-4-1024k",
schema {
"codecName": "rs",
"numDataUnits": 10,
"numParityUnits": 4,
"extraOptions": {}
}
"cellSize": 1048576,
"id":5,
"codecname":"rs",
"numDataUnits": 10,
"numParityUnits": 4,
"replicationpolicy":false,
"systemPolicy":true
}
```
### BlockStoragePolicies JSON Schema ### BlockStoragePolicies JSON Schema
@ -2244,6 +2328,7 @@ A `BlockStoragePolicies` JSON object represents an array of `BlockStoragePolicy`
} }
``` ```
#### DiffReport Entries #### DiffReport Entries
JavaScript syntax is used to define `diffReportEntries` so that it can be referred in `SnapshotDiffReport` JSON schema. JavaScript syntax is used to define `diffReportEntries` so that it can be referred in `SnapshotDiffReport` JSON schema.

View File

@ -1623,7 +1623,7 @@ private void checkECPolicyState(Collection<ErasureCodingPolicyInfo> policies,
// Test For Enable/Disable EC Policy in DFS. // Test For Enable/Disable EC Policy in DFS.
@Test @Test
public void testEnableDisableECPolicy() throws Exception { public void testECPolicyCommands() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
try (MiniDFSCluster cluster = try (MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(0).build()) { new MiniDFSCluster.Builder(conf).numDataNodes(0).build()) {
@ -1632,12 +1632,24 @@ public void testEnableDisableECPolicy() throws Exception {
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil final WebHdfsFileSystem webHdfs = WebHdfsTestUtil
.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME); .getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
String policy = "RS-10-4-1024k"; String policy = "RS-10-4-1024k";
// Check for Enable EC policy via WEBHDFS. // Check for Enable EC policy via WEBHDFS.
dfs.disableErasureCodingPolicy(policy); dfs.disableErasureCodingPolicy(policy);
checkECPolicyState(dfs.getAllErasureCodingPolicies(), policy, "disable"); checkECPolicyState(dfs.getAllErasureCodingPolicies(), policy, "disable");
webHdfs.enableECPolicy("RS-10-4-1024k"); webHdfs.enableECPolicy(policy);
checkECPolicyState(dfs.getAllErasureCodingPolicies(), policy, "enable"); checkECPolicyState(dfs.getAllErasureCodingPolicies(), policy, "enable");
Path dir = new Path("/tmp");
dfs.mkdirs(dir);
// Check for Set EC policy via WEBHDFS
assertNull(dfs.getErasureCodingPolicy(dir));
webHdfs.setErasureCodingPolicy(dir, policy);
assertEquals(policy, dfs.getErasureCodingPolicy(dir).getName());
// Check for Get EC policy via WEBHDFS
assertEquals(policy, webHdfs.getErasureCodingPolicy(dir).getName());
// Check for Unset EC policy via WEBHDFS
webHdfs.unsetErasureCodingPolicy(dir);
assertNull(dfs.getErasureCodingPolicy(dir));
// Check for Disable EC policy via WEBHDFS. // Check for Disable EC policy via WEBHDFS.
webHdfs.disableECPolicy(policy); webHdfs.disableECPolicy(policy);