HDFS-2501. Add version prefix and root methods to webhdfs.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1189028 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2adc51c618
commit
8335995630
@ -80,6 +80,8 @@ Trunk (unreleased changes)
|
||||
|
||||
HDFS-2488. Separate datatypes for InterDatanodeProtocol. (suresh)
|
||||
|
||||
HDFS-2501. Add version prefix and root methods to webhdfs. (szetszwo)
|
||||
|
||||
BUG FIXES
|
||||
HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)
|
||||
|
||||
|
@ -559,7 +559,7 @@ conf, new AccessControlList(conf.get(DFS_ADMIN, " ")),
|
||||
if (conf.getBoolean(DFS_WEBHDFS_ENABLED_KEY, DFS_WEBHDFS_ENABLED_DEFAULT)) {
|
||||
infoServer.addJerseyResourcePackage(DatanodeWebHdfsMethods.class
|
||||
.getPackage().getName() + ";" + Param.class.getPackage().getName(),
|
||||
"/" + WebHdfsFileSystem.PATH_PREFIX + "/*");
|
||||
WebHdfsFileSystem.PATH_PREFIX + "/*");
|
||||
}
|
||||
this.infoServer.start();
|
||||
}
|
||||
|
@ -78,9 +78,36 @@
|
||||
public class DatanodeWebHdfsMethods {
|
||||
public static final Log LOG = LogFactory.getLog(DatanodeWebHdfsMethods.class);
|
||||
|
||||
private static final UriFsPathParam ROOT = new UriFsPathParam("");
|
||||
|
||||
private @Context ServletContext context;
|
||||
private @Context HttpServletResponse response;
|
||||
|
||||
/** Handle HTTP PUT request for the root. */
|
||||
@PUT
|
||||
@Path("/")
|
||||
@Consumes({"*/*"})
|
||||
@Produces({MediaType.APPLICATION_JSON})
|
||||
public Response putRoot(
|
||||
final InputStream in,
|
||||
@Context final UserGroupInformation ugi,
|
||||
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
|
||||
final PutOpParam op,
|
||||
@QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
|
||||
final PermissionParam permission,
|
||||
@QueryParam(OverwriteParam.NAME) @DefaultValue(OverwriteParam.DEFAULT)
|
||||
final OverwriteParam overwrite,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize,
|
||||
@QueryParam(ReplicationParam.NAME) @DefaultValue(ReplicationParam.DEFAULT)
|
||||
final ReplicationParam replication,
|
||||
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
|
||||
final BlockSizeParam blockSize
|
||||
) throws IOException, InterruptedException {
|
||||
return put(in, ugi, ROOT, op, permission, overwrite, bufferSize,
|
||||
replication, blockSize);
|
||||
}
|
||||
|
||||
/** Handle HTTP PUT request. */
|
||||
@PUT
|
||||
@Path("{" + UriFsPathParam.NAME + ":.*}")
|
||||
@ -149,6 +176,22 @@ public Response run() throws IOException, URISyntaxException {
|
||||
});
|
||||
}
|
||||
|
||||
/** Handle HTTP POST request for the root for the root. */
|
||||
@POST
|
||||
@Path("/")
|
||||
@Consumes({"*/*"})
|
||||
@Produces({MediaType.APPLICATION_JSON})
|
||||
public Response postRoot(
|
||||
final InputStream in,
|
||||
@Context final UserGroupInformation ugi,
|
||||
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
|
||||
final PostOpParam op,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize
|
||||
) throws IOException, InterruptedException {
|
||||
return post(in, ugi, ROOT, op, bufferSize);
|
||||
}
|
||||
|
||||
/** Handle HTTP POST request. */
|
||||
@POST
|
||||
@Path("{" + UriFsPathParam.NAME + ":.*}")
|
||||
@ -201,6 +244,24 @@ public Response run() throws IOException {
|
||||
});
|
||||
}
|
||||
|
||||
/** Handle HTTP GET request for the root. */
|
||||
@GET
|
||||
@Path("/")
|
||||
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
|
||||
public Response getRoot(
|
||||
@Context final UserGroupInformation ugi,
|
||||
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
||||
final GetOpParam op,
|
||||
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
|
||||
final OffsetParam offset,
|
||||
@QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT)
|
||||
final LengthParam length,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize
|
||||
) throws IOException, InterruptedException {
|
||||
return get(ugi, ROOT, op, offset, length, bufferSize);
|
||||
}
|
||||
|
||||
/** Handle HTTP GET request. */
|
||||
@GET
|
||||
@Path("{" + UriFsPathParam.NAME + ":.*}")
|
||||
|
@ -109,7 +109,7 @@ public HttpServer run() throws IOException, InterruptedException {
|
||||
//add SPNEGO authentication filter for webhdfs
|
||||
final String name = "SPNEGO";
|
||||
final String classname = AuthFilter.class.getName();
|
||||
final String pathSpec = "/" + WebHdfsFileSystem.PATH_PREFIX + "/*";
|
||||
final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*";
|
||||
Map<String, String> params = getAuthFilterParams(conf);
|
||||
defineFilter(webAppContext, name, classname, params,
|
||||
new String[]{pathSpec});
|
||||
|
@ -101,6 +101,8 @@
|
||||
public class NamenodeWebHdfsMethods {
|
||||
public static final Log LOG = LogFactory.getLog(NamenodeWebHdfsMethods.class);
|
||||
|
||||
private static final UriFsPathParam ROOT = new UriFsPathParam("");
|
||||
|
||||
private static final ThreadLocal<String> REMOTE_ADDRESS = new ThreadLocal<String>();
|
||||
|
||||
/** @return the remote client address. */
|
||||
@ -179,7 +181,7 @@ private URI redirectURI(final NameNode namenode,
|
||||
final String query = op.toQueryString()
|
||||
+ '&' + new UserParam(ugi) + delegationQuery
|
||||
+ Param.toSortedString("&", parameters);
|
||||
final String uripath = "/" + WebHdfsFileSystem.PATH_PREFIX + path;
|
||||
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
|
||||
|
||||
final URI uri = new URI("http", null, dn.getHostName(), dn.getInfoPort(),
|
||||
uripath, query, null);
|
||||
@ -189,6 +191,45 @@ private URI redirectURI(final NameNode namenode,
|
||||
return uri;
|
||||
}
|
||||
|
||||
/** Handle HTTP PUT request for the root. */
|
||||
@PUT
|
||||
@Path("/")
|
||||
@Consumes({"*/*"})
|
||||
@Produces({MediaType.APPLICATION_JSON})
|
||||
public Response putRoot(
|
||||
@Context final UserGroupInformation ugi,
|
||||
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
||||
final DelegationParam delegation,
|
||||
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
|
||||
final PutOpParam op,
|
||||
@QueryParam(DestinationParam.NAME) @DefaultValue(DestinationParam.DEFAULT)
|
||||
final DestinationParam destination,
|
||||
@QueryParam(OwnerParam.NAME) @DefaultValue(OwnerParam.DEFAULT)
|
||||
final OwnerParam owner,
|
||||
@QueryParam(GroupParam.NAME) @DefaultValue(GroupParam.DEFAULT)
|
||||
final GroupParam group,
|
||||
@QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
|
||||
final PermissionParam permission,
|
||||
@QueryParam(OverwriteParam.NAME) @DefaultValue(OverwriteParam.DEFAULT)
|
||||
final OverwriteParam overwrite,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize,
|
||||
@QueryParam(ReplicationParam.NAME) @DefaultValue(ReplicationParam.DEFAULT)
|
||||
final ReplicationParam replication,
|
||||
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
|
||||
final BlockSizeParam blockSize,
|
||||
@QueryParam(ModificationTimeParam.NAME) @DefaultValue(ModificationTimeParam.DEFAULT)
|
||||
final ModificationTimeParam modificationTime,
|
||||
@QueryParam(AccessTimeParam.NAME) @DefaultValue(AccessTimeParam.DEFAULT)
|
||||
final AccessTimeParam accessTime,
|
||||
@QueryParam(RenameOptionSetParam.NAME) @DefaultValue(RenameOptionSetParam.DEFAULT)
|
||||
final RenameOptionSetParam renameOptions
|
||||
) throws IOException, InterruptedException {
|
||||
return put(ugi, delegation, ROOT, op, destination, owner, group,
|
||||
permission, overwrite, bufferSize, replication, blockSize,
|
||||
modificationTime, accessTime, renameOptions);
|
||||
}
|
||||
|
||||
/** Handle HTTP PUT request. */
|
||||
@PUT
|
||||
@Path("{" + UriFsPathParam.NAME + ":.*}")
|
||||
@ -305,6 +346,23 @@ public Response run() throws IOException, URISyntaxException {
|
||||
});
|
||||
}
|
||||
|
||||
/** Handle HTTP POST request for the root. */
|
||||
@POST
|
||||
@Path("/")
|
||||
@Consumes({"*/*"})
|
||||
@Produces({MediaType.APPLICATION_JSON})
|
||||
public Response postRoot(
|
||||
@Context final UserGroupInformation ugi,
|
||||
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
||||
final DelegationParam delegation,
|
||||
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
|
||||
final PostOpParam op,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize
|
||||
) throws IOException, InterruptedException {
|
||||
return post(ugi, delegation, ROOT, op, bufferSize);
|
||||
}
|
||||
|
||||
/** Handle HTTP POST request. */
|
||||
@POST
|
||||
@Path("{" + UriFsPathParam.NAME + ":.*}")
|
||||
@ -356,13 +414,11 @@ public Response run() throws IOException, URISyntaxException {
|
||||
});
|
||||
}
|
||||
|
||||
private static final UriFsPathParam ROOT = new UriFsPathParam("");
|
||||
|
||||
/** Handle HTTP GET request for the root. */
|
||||
@GET
|
||||
@Path("/")
|
||||
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
|
||||
public Response root(
|
||||
public Response getRoot(
|
||||
@Context final UserGroupInformation ugi,
|
||||
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
||||
final DelegationParam delegation,
|
||||
@ -520,9 +576,23 @@ public void write(final OutputStream outstream) throws IOException {
|
||||
};
|
||||
}
|
||||
|
||||
/** Handle HTTP DELETE request for the root. */
|
||||
@DELETE
|
||||
@Path("/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Response deleteRoot(
|
||||
@Context final UserGroupInformation ugi,
|
||||
@QueryParam(DeleteOpParam.NAME) @DefaultValue(DeleteOpParam.DEFAULT)
|
||||
final DeleteOpParam op,
|
||||
@QueryParam(RecursiveParam.NAME) @DefaultValue(RecursiveParam.DEFAULT)
|
||||
final RecursiveParam recursive
|
||||
) throws IOException, InterruptedException {
|
||||
return delete(ugi, ROOT, op, recursive);
|
||||
}
|
||||
|
||||
/** Handle HTTP DELETE request. */
|
||||
@DELETE
|
||||
@Path("{path:.*}")
|
||||
@Path("{" + UriFsPathParam.NAME + ":.*}")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Response delete(
|
||||
@Context final UserGroupInformation ugi,
|
||||
|
@ -92,8 +92,10 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
||||
public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
|
||||
/** File System URI: {SCHEME}://namenode:port/path/to/file */
|
||||
public static final String SCHEME = "webhdfs";
|
||||
/** WebHdfs version. */
|
||||
public static final int VERSION = 1;
|
||||
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
|
||||
public static final String PATH_PREFIX = SCHEME;
|
||||
public static final String PATH_PREFIX = "/" + SCHEME + "/v" + VERSION;
|
||||
|
||||
private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator();
|
||||
|
||||
@ -188,7 +190,7 @@ private static void validateResponse(final HttpOpParam.Op op,
|
||||
URL toUrl(final HttpOpParam.Op op, final Path fspath,
|
||||
final Param<?,?>... parameters) throws IOException {
|
||||
//initialize URI path and query
|
||||
final String path = "/" + PATH_PREFIX
|
||||
final String path = PATH_PREFIX
|
||||
+ (fspath == null? "/": makeQualified(fspath).toUri().getPath());
|
||||
final String query = op.toQueryString()
|
||||
+ '&' + new UserParam(ugi)
|
||||
|
@ -30,15 +30,19 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.junit.Assert;
|
||||
|
||||
public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
||||
private static final Configuration conf = new Configuration();
|
||||
@ -215,4 +219,42 @@ public void testSeek() throws IOException {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testRootDir() throws IOException {
|
||||
final Path root = new Path("/");
|
||||
|
||||
final WebHdfsFileSystem webhdfs = (WebHdfsFileSystem)fs;
|
||||
final URL url = webhdfs.toUrl(GetOpParam.Op.NULL, root);
|
||||
WebHdfsFileSystem.LOG.info("null url=" + url);
|
||||
Assert.assertTrue(url.toString().contains("v1"));
|
||||
|
||||
//test root permission
|
||||
final FileStatus status = fs.getFileStatus(root);
|
||||
assertTrue(status != null);
|
||||
assertEquals(0777, status.getPermission().toShort());
|
||||
|
||||
//delete root - disabled due to a sticky bit bug
|
||||
//assertFalse(fs.delete(root, true));
|
||||
|
||||
//create file using root path
|
||||
try {
|
||||
final FSDataOutputStream out = fs.create(root);
|
||||
out.write(1);
|
||||
out.close();
|
||||
fail();
|
||||
} catch(IOException e) {
|
||||
WebHdfsFileSystem.LOG.info("This is expected.", e);
|
||||
}
|
||||
|
||||
//open file using root path
|
||||
try {
|
||||
final FSDataInputStream in = fs.open(root);
|
||||
in.read();
|
||||
fail();
|
||||
fail();
|
||||
} catch(IOException e) {
|
||||
WebHdfsFileSystem.LOG.info("This is expected.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user