HDFS-7656. Expose truncate API for HDFS httpfs. (yliu)
This commit is contained in:
parent
64a8375635
commit
2fd02afeca
@ -109,12 +109,15 @@ public class HttpFSFileSystem extends FileSystem
|
|||||||
public static final String XATTR_VALUE_PARAM = "xattr.value";
|
public static final String XATTR_VALUE_PARAM = "xattr.value";
|
||||||
public static final String XATTR_SET_FLAG_PARAM = "flag";
|
public static final String XATTR_SET_FLAG_PARAM = "flag";
|
||||||
public static final String XATTR_ENCODING_PARAM = "encoding";
|
public static final String XATTR_ENCODING_PARAM = "encoding";
|
||||||
|
public static final String NEW_LENGTH_PARAM = "newlength";
|
||||||
|
|
||||||
public static final Short DEFAULT_PERMISSION = 0755;
|
public static final Short DEFAULT_PERMISSION = 0755;
|
||||||
public static final String ACLSPEC_DEFAULT = "";
|
public static final String ACLSPEC_DEFAULT = "";
|
||||||
|
|
||||||
public static final String RENAME_JSON = "boolean";
|
public static final String RENAME_JSON = "boolean";
|
||||||
|
|
||||||
|
public static final String TRUNCATE_JSON = "boolean";
|
||||||
|
|
||||||
public static final String DELETE_JSON = "boolean";
|
public static final String DELETE_JSON = "boolean";
|
||||||
|
|
||||||
public static final String MKDIRS_JSON = "boolean";
|
public static final String MKDIRS_JSON = "boolean";
|
||||||
@ -191,7 +194,7 @@ public static enum Operation {
|
|||||||
GETHOMEDIRECTORY(HTTP_GET), GETCONTENTSUMMARY(HTTP_GET),
|
GETHOMEDIRECTORY(HTTP_GET), GETCONTENTSUMMARY(HTTP_GET),
|
||||||
GETFILECHECKSUM(HTTP_GET), GETFILEBLOCKLOCATIONS(HTTP_GET),
|
GETFILECHECKSUM(HTTP_GET), GETFILEBLOCKLOCATIONS(HTTP_GET),
|
||||||
INSTRUMENTATION(HTTP_GET), GETACLSTATUS(HTTP_GET),
|
INSTRUMENTATION(HTTP_GET), GETACLSTATUS(HTTP_GET),
|
||||||
APPEND(HTTP_POST), CONCAT(HTTP_POST),
|
APPEND(HTTP_POST), CONCAT(HTTP_POST), TRUNCATE(HTTP_POST),
|
||||||
CREATE(HTTP_PUT), MKDIRS(HTTP_PUT), RENAME(HTTP_PUT), SETOWNER(HTTP_PUT),
|
CREATE(HTTP_PUT), MKDIRS(HTTP_PUT), RENAME(HTTP_PUT), SETOWNER(HTTP_PUT),
|
||||||
SETPERMISSION(HTTP_PUT), SETREPLICATION(HTTP_PUT), SETTIMES(HTTP_PUT),
|
SETPERMISSION(HTTP_PUT), SETREPLICATION(HTTP_PUT), SETTIMES(HTTP_PUT),
|
||||||
MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT),
|
MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT),
|
||||||
@ -567,6 +570,25 @@ public FSDataOutputStream append(Path f, int bufferSize,
|
|||||||
HttpURLConnection.HTTP_OK);
|
HttpURLConnection.HTTP_OK);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Truncate a file.
|
||||||
|
*
|
||||||
|
* @param f the file to be truncated.
|
||||||
|
* @param newLength The size the file is to be truncated to.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean truncate(Path f, long newLength) throws IOException {
|
||||||
|
Map<String, String> params = new HashMap<String, String>();
|
||||||
|
params.put(OP_PARAM, Operation.TRUNCATE.toString());
|
||||||
|
params.put(NEW_LENGTH_PARAM, Long.toString(newLength));
|
||||||
|
HttpURLConnection conn = getConnection(Operation.TRUNCATE.getMethod(),
|
||||||
|
params, f, true);
|
||||||
|
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
||||||
|
return (Boolean) json.get(TRUNCATE_JSON);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Concat existing files together.
|
* Concat existing files together.
|
||||||
* @param f the path to the target destination.
|
* @param f the path to the target destination.
|
||||||
|
@ -364,7 +364,7 @@ public Void execute(FileSystem fs) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executor that performs an append FileSystemAccess files system operation.
|
* Executor that performs a concat FileSystemAccess files system operation.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public static class FSConcat implements FileSystemAccess.FileSystemExecutor<Void> {
|
public static class FSConcat implements FileSystemAccess.FileSystemExecutor<Void> {
|
||||||
@ -404,6 +404,47 @@ public Void execute(FileSystem fs) throws IOException {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executor that performs a truncate FileSystemAccess files system operation.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public static class FSTruncate implements
|
||||||
|
FileSystemAccess.FileSystemExecutor<JSONObject> {
|
||||||
|
private Path path;
|
||||||
|
private long newLength;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a Truncate executor.
|
||||||
|
*
|
||||||
|
* @param path target path to truncate to.
|
||||||
|
* @param newLength The size the file is to be truncated to.
|
||||||
|
*/
|
||||||
|
public FSTruncate(String path, long newLength) {
|
||||||
|
this.path = new Path(path);
|
||||||
|
this.newLength = newLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes the filesystem operation.
|
||||||
|
*
|
||||||
|
* @param fs filesystem instance to use.
|
||||||
|
*
|
||||||
|
* @return <code>true</code> if the file has been truncated to the desired,
|
||||||
|
* <code>false</code> if a background process of adjusting the
|
||||||
|
* length of the last block has been started, and clients should
|
||||||
|
* wait for it to complete before proceeding with further file
|
||||||
|
* updates.
|
||||||
|
*
|
||||||
|
* @throws IOException thrown if an IO error occured.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public JSONObject execute(FileSystem fs) throws IOException {
|
||||||
|
boolean result = fs.truncate(path, newLength);
|
||||||
|
return toJSON(HttpFSFileSystem.TRUNCATE_JSON.toLowerCase(), result);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executor that performs a content-summary FileSystemAccess files system operation.
|
* Executor that performs a content-summary FileSystemAccess files system operation.
|
||||||
*/
|
*/
|
||||||
|
@ -63,6 +63,7 @@ public class HttpFSParametersProvider extends ParametersProvider {
|
|||||||
PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{});
|
PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{});
|
||||||
PARAMS_DEF.put(Operation.APPEND, new Class[]{DataParam.class});
|
PARAMS_DEF.put(Operation.APPEND, new Class[]{DataParam.class});
|
||||||
PARAMS_DEF.put(Operation.CONCAT, new Class[]{SourcesParam.class});
|
PARAMS_DEF.put(Operation.CONCAT, new Class[]{SourcesParam.class});
|
||||||
|
PARAMS_DEF.put(Operation.TRUNCATE, new Class[]{NewLengthParam.class});
|
||||||
PARAMS_DEF.put(Operation.CREATE,
|
PARAMS_DEF.put(Operation.CREATE,
|
||||||
new Class[]{PermissionParam.class, OverwriteParam.class,
|
new Class[]{PermissionParam.class, OverwriteParam.class,
|
||||||
ReplicationParam.class, BlockSizeParam.class, DataParam.class});
|
ReplicationParam.class, BlockSizeParam.class, DataParam.class});
|
||||||
@ -289,6 +290,25 @@ public OffsetParam() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class for newlength parameter.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public static class NewLengthParam extends LongParam {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parameter name.
|
||||||
|
*/
|
||||||
|
public static final String NAME = HttpFSFileSystem.NEW_LENGTH_PARAM;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
*/
|
||||||
|
public NewLengthParam() {
|
||||||
|
super(NAME, 0l);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class for overwrite parameter.
|
* Class for overwrite parameter.
|
||||||
*/
|
*/
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.GroupParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.GroupParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.LenParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.LenParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ModifiedTimeParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ModifiedTimeParam;
|
||||||
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.NewLengthParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OffsetParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OffsetParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OperationParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OperationParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OverwriteParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OverwriteParam;
|
||||||
@ -427,6 +428,15 @@ public Response post(InputStream is,
|
|||||||
response = Response.ok().build();
|
response = Response.ok().build();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case TRUNCATE: {
|
||||||
|
Long newLength = params.get(NewLengthParam.NAME, NewLengthParam.class);
|
||||||
|
FSOperations.FSTruncate command =
|
||||||
|
new FSOperations.FSTruncate(path, newLength);
|
||||||
|
JSONObject json = fsExecute(user, command);
|
||||||
|
AUDIT_LOG.info("Truncate [{}] to length [{}]", path, newLength);
|
||||||
|
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
|
||||||
|
break;
|
||||||
|
}
|
||||||
default: {
|
default: {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
MessageFormat.format("Invalid HTTP POST operation [{0}]",
|
MessageFormat.format("Invalid HTTP POST operation [{0}]",
|
||||||
|
@ -24,12 +24,14 @@
|
|||||||
import org.apache.hadoop.fs.FileChecksum;
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
|
import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
|
||||||
import org.apache.hadoop.fs.permission.AclEntry;
|
import org.apache.hadoop.fs.permission.AclEntry;
|
||||||
import org.apache.hadoop.fs.permission.AclStatus;
|
import org.apache.hadoop.fs.permission.AclStatus;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
@ -222,6 +224,31 @@ private void testAppend() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testTruncate() throws Exception {
|
||||||
|
if (!isLocalFS()) {
|
||||||
|
final short repl = 3;
|
||||||
|
final int blockSize = 1024;
|
||||||
|
final int numOfBlocks = 2;
|
||||||
|
FileSystem fs = FileSystem.get(getProxiedFSConf());
|
||||||
|
fs.mkdirs(getProxiedFSTestDir());
|
||||||
|
Path file = new Path(getProxiedFSTestDir(), "foo.txt");
|
||||||
|
final byte[] data = FileSystemTestHelper.getFileData(
|
||||||
|
numOfBlocks, blockSize);
|
||||||
|
FileSystemTestHelper.createFile(fs, file, data, blockSize, repl);
|
||||||
|
|
||||||
|
final int newLength = blockSize;
|
||||||
|
|
||||||
|
boolean isReady = fs.truncate(file, newLength);
|
||||||
|
Assert.assertTrue("Recovery is not expected.", isReady);
|
||||||
|
|
||||||
|
FileStatus fileStatus = fs.getFileStatus(file);
|
||||||
|
Assert.assertEquals(fileStatus.getLen(), newLength);
|
||||||
|
AppendTestUtil.checkFullFile(fs, file, newLength, data, file.toString());
|
||||||
|
|
||||||
|
fs.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void testConcat() throws Exception {
|
private void testConcat() throws Exception {
|
||||||
Configuration config = getProxiedFSConf();
|
Configuration config = getProxiedFSConf();
|
||||||
config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
|
config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
|
||||||
@ -784,9 +811,10 @@ private void testDirAcls() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected enum Operation {
|
protected enum Operation {
|
||||||
GET, OPEN, CREATE, APPEND, CONCAT, RENAME, DELETE, LIST_STATUS, WORKING_DIRECTORY, MKDIRS,
|
GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS,
|
||||||
SET_TIMES, SET_PERMISSION, SET_OWNER, SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY,
|
WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER,
|
||||||
FILEACLS, DIRACLS, SET_XATTR, GET_XATTRS, REMOVE_XATTR, LIST_XATTRS
|
SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR,
|
||||||
|
GET_XATTRS, REMOVE_XATTR, LIST_XATTRS
|
||||||
}
|
}
|
||||||
|
|
||||||
private void operation(Operation op) throws Exception {
|
private void operation(Operation op) throws Exception {
|
||||||
@ -803,8 +831,12 @@ private void operation(Operation op) throws Exception {
|
|||||||
case APPEND:
|
case APPEND:
|
||||||
testAppend();
|
testAppend();
|
||||||
break;
|
break;
|
||||||
|
case TRUNCATE:
|
||||||
|
testTruncate();
|
||||||
|
break;
|
||||||
case CONCAT:
|
case CONCAT:
|
||||||
testConcat();
|
testConcat();
|
||||||
|
break;
|
||||||
case RENAME:
|
case RENAME:
|
||||||
testRename();
|
testRename();
|
||||||
break;
|
break;
|
||||||
|
@ -337,6 +337,8 @@ Release 2.7.0 - UNRELEASED
|
|||||||
HDFS-7584. Enable Quota Support for Storage Types (See breakdown of
|
HDFS-7584. Enable Quota Support for Storage Types (See breakdown of
|
||||||
tasks below)
|
tasks below)
|
||||||
|
|
||||||
|
HDFS-7656. Expose truncate API for HDFS httpfs. (yliu)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HDFS-7055. Add tracing to DFSInputStream (cmccabe)
|
HDFS-7055. Add tracing to DFSInputStream (cmccabe)
|
||||||
|
Loading…
Reference in New Issue
Block a user