Merge trunk into HA branch
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1171808 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
70a180d0f5
@ -591,6 +591,9 @@ Release 0.23.0 - Unreleased
|
||||
HADOOP-7629. Allow immutable FsPermission objects to be used as IPC
|
||||
parameters. (todd)
|
||||
|
||||
HADOOP-7608. SnappyCodec check for Hadoop native lib is wrong
|
||||
(Alejandro Abdelnur via todd)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
|
||||
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.util.NativeCodeLoader;
|
||||
|
||||
/**
|
||||
* This class creates snappy compressors/decompressors.
|
||||
@ -63,13 +64,10 @@ public Configuration getConf() {
|
||||
/**
|
||||
* Are the native snappy libraries loaded & initialized?
|
||||
*
|
||||
* @param conf configuration
|
||||
* @return true if loaded & initialized, otherwise false
|
||||
*/
|
||||
public static boolean isNativeSnappyLoaded(Configuration conf) {
|
||||
return LoadSnappy.isLoaded() && conf.getBoolean(
|
||||
CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY,
|
||||
CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_DEFAULT);
|
||||
public static boolean isNativeCodeLoaded() {
|
||||
return LoadSnappy.isLoaded() && NativeCodeLoader.isNativeCodeLoaded();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -99,7 +97,7 @@ public CompressionOutputStream createOutputStream(OutputStream out)
|
||||
public CompressionOutputStream createOutputStream(OutputStream out,
|
||||
Compressor compressor)
|
||||
throws IOException {
|
||||
if (!isNativeSnappyLoaded(conf)) {
|
||||
if (!isNativeCodeLoaded()) {
|
||||
throw new RuntimeException("native snappy library not available");
|
||||
}
|
||||
int bufferSize = conf.getInt(
|
||||
@ -119,7 +117,7 @@ public CompressionOutputStream createOutputStream(OutputStream out,
|
||||
*/
|
||||
@Override
|
||||
public Class<? extends Compressor> getCompressorType() {
|
||||
if (!isNativeSnappyLoaded(conf)) {
|
||||
if (!isNativeCodeLoaded()) {
|
||||
throw new RuntimeException("native snappy library not available");
|
||||
}
|
||||
|
||||
@ -133,7 +131,7 @@ public Class<? extends Compressor> getCompressorType() {
|
||||
*/
|
||||
@Override
|
||||
public Compressor createCompressor() {
|
||||
if (!isNativeSnappyLoaded(conf)) {
|
||||
if (!isNativeCodeLoaded()) {
|
||||
throw new RuntimeException("native snappy library not available");
|
||||
}
|
||||
int bufferSize = conf.getInt(
|
||||
@ -169,7 +167,7 @@ public CompressionInputStream createInputStream(InputStream in)
|
||||
public CompressionInputStream createInputStream(InputStream in,
|
||||
Decompressor decompressor)
|
||||
throws IOException {
|
||||
if (!isNativeSnappyLoaded(conf)) {
|
||||
if (!isNativeCodeLoaded()) {
|
||||
throw new RuntimeException("native snappy library not available");
|
||||
}
|
||||
|
||||
@ -185,7 +183,7 @@ public CompressionInputStream createInputStream(InputStream in,
|
||||
*/
|
||||
@Override
|
||||
public Class<? extends Decompressor> getDecompressorType() {
|
||||
if (!isNativeSnappyLoaded(conf)) {
|
||||
if (!isNativeCodeLoaded()) {
|
||||
throw new RuntimeException("native snappy library not available");
|
||||
}
|
||||
|
||||
@ -199,7 +197,7 @@ public Class<? extends Decompressor> getDecompressorType() {
|
||||
*/
|
||||
@Override
|
||||
public Decompressor createDecompressor() {
|
||||
if (!isNativeSnappyLoaded(conf)) {
|
||||
if (!isNativeCodeLoaded()) {
|
||||
throw new RuntimeException("native snappy library not available");
|
||||
}
|
||||
int bufferSize = conf.getInt(
|
||||
|
@ -10,6 +10,12 @@ Trunk (unreleased changes)
|
||||
|
||||
HDFS-2317. Support read access to HDFS in webhdfs. (szetszwo)
|
||||
|
||||
HDFS-2338. Add configuration option to enable/disable webhdfs.
|
||||
(jitendra via szetszwo)
|
||||
|
||||
HDFS-2318. Provide authentication to webhdfs using SPNEGO and delegation
|
||||
tokens. (szetszwo)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
|
||||
@ -23,6 +29,8 @@ Trunk (unreleased changes)
|
||||
|
||||
HDFS-2223. Untangle depencencies between NN components (todd)
|
||||
|
||||
HDFS-2337. DFSClient shouldn't keep multiple RPC proxy references (atm)
|
||||
|
||||
BUG FIXES
|
||||
HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)
|
||||
|
||||
|
@ -114,7 +114,6 @@ public class DFSClient implements java.io.Closeable {
|
||||
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
|
||||
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
|
||||
final ClientProtocol namenode;
|
||||
final ClientProtocol rpcNamenode;
|
||||
final UserGroupInformation ugi;
|
||||
volatile boolean clientRunning = true;
|
||||
private volatile FsServerDefaults serverDefaults;
|
||||
@ -253,11 +252,10 @@ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
|
||||
this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
|
||||
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
|
||||
if (nameNodeAddr != null && rpcNamenode == null) {
|
||||
this.rpcNamenode = DFSUtil.createRPCNamenode(nameNodeAddr, conf, ugi);
|
||||
this.namenode = DFSUtil.createNamenode(this.rpcNamenode);
|
||||
this.namenode = DFSUtil.createNamenode(nameNodeAddr, conf);
|
||||
} else if (nameNodeAddr == null && rpcNamenode != null) {
|
||||
//This case is used for testing.
|
||||
this.namenode = this.rpcNamenode = rpcNamenode;
|
||||
this.namenode = rpcNamenode;
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
"Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
|
||||
@ -340,7 +338,7 @@ void renewLease() throws IOException {
|
||||
void abort() {
|
||||
clientRunning = false;
|
||||
closeAllFilesBeingWritten(true);
|
||||
RPC.stopProxy(rpcNamenode); // close connections to the namenode
|
||||
RPC.stopProxy(namenode); // close connections to the namenode
|
||||
}
|
||||
|
||||
/** Close/abort all files being written. */
|
||||
@ -380,7 +378,7 @@ public synchronized void close() throws IOException {
|
||||
clientRunning = false;
|
||||
leaserenewer.closeClient(this);
|
||||
// close connections to the namenode
|
||||
RPC.stopProxy(rpcNamenode);
|
||||
RPC.stopProxy(namenode);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,6 +101,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1;
|
||||
public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams";
|
||||
public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;
|
||||
public static final String DFS_WEBHDFS_ENABLED_KEY = "dfs.webhdfs.enabled";
|
||||
public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = false;
|
||||
public static final String DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled";
|
||||
public static final boolean DFS_PERMISSIONS_ENABLED_DEFAULT = true;
|
||||
public static final String DFS_PERMISSIONS_SUPERUSERGROUP_KEY = "dfs.permissions.superusergroup";
|
||||
|
@ -21,14 +21,20 @@
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
|
||||
/**
|
||||
* A HDFS specific delegation token secret manager.
|
||||
@ -278,4 +284,22 @@ protected void logUpdateMasterKey(DelegationKey key)
|
||||
throws IOException {
|
||||
namesystem.logUpdateMasterKey(key);
|
||||
}
|
||||
|
||||
/** A utility method for creating credentials. */
|
||||
public static Credentials createCredentials(final NameNode namenode,
|
||||
final UserGroupInformation ugi, final String renewer) throws IOException {
|
||||
final Token<DelegationTokenIdentifier> token = namenode.getRpcServer(
|
||||
).getDelegationToken(new Text(renewer));
|
||||
if (token == null) {
|
||||
throw new IOException("Failed to get the token for " + renewer
|
||||
+ ", user=" + ugi.getShortUserName());
|
||||
}
|
||||
|
||||
final InetSocketAddress addr = namenode.getNameNodeAddress();
|
||||
final String s = addr.getAddress().getHostAddress() + ":" + addr.getPort();
|
||||
token.setService(new Text(s));
|
||||
final Credentials c = new Credentials();
|
||||
c.addToken(new Text(ugi.getShortUserName()), token);
|
||||
return c;
|
||||
}
|
||||
}
|
||||
|
@ -54,6 +54,7 @@
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
|
||||
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
||||
import org.apache.hadoop.http.HtmlQuoting;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
@ -501,29 +502,38 @@ private static String getNNServiceAddress(ServletContext context,
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* See
|
||||
* {@link JspHelper#getUGI(ServletContext, HttpServletRequest, Configuration)}
|
||||
* , ServletContext is passed as null.
|
||||
*/
|
||||
/** Same as getUGI(null, request, conf). */
|
||||
public static UserGroupInformation getUGI(HttpServletRequest request,
|
||||
Configuration conf) throws IOException {
|
||||
return getUGI(null, request, conf);
|
||||
}
|
||||
|
||||
/** Same as getUGI(context, request, conf, KERBEROS_SSL, true). */
|
||||
public static UserGroupInformation getUGI(ServletContext context,
|
||||
HttpServletRequest request, Configuration conf) throws IOException {
|
||||
return getUGI(context, request, conf, AuthenticationMethod.KERBEROS_SSL, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get {@link UserGroupInformation} and possibly the delegation token out of
|
||||
* the request.
|
||||
* @param context the ServletContext that is serving this request.
|
||||
* @param request the http request
|
||||
* @param conf configuration
|
||||
* @param secureAuthMethod the AuthenticationMethod used in secure mode.
|
||||
* @param tryUgiParameter Should it try the ugi parameter?
|
||||
* @return a new user from the request
|
||||
* @throws AccessControlException if the request has no token
|
||||
*/
|
||||
public static UserGroupInformation getUGI(ServletContext context,
|
||||
HttpServletRequest request, Configuration conf) throws IOException {
|
||||
UserGroupInformation ugi = null;
|
||||
HttpServletRequest request, Configuration conf,
|
||||
final AuthenticationMethod secureAuthMethod,
|
||||
final boolean tryUgiParameter) throws IOException {
|
||||
final UserGroupInformation ugi;
|
||||
final String usernameFromQuery = getUsernameFromQuery(request, tryUgiParameter);
|
||||
|
||||
if(UserGroupInformation.isSecurityEnabled()) {
|
||||
String user = request.getRemoteUser();
|
||||
final String user = request.getRemoteUser();
|
||||
String tokenString = request.getParameter(DELEGATION_PARAMETER_NAME);
|
||||
if (tokenString != null) {
|
||||
Token<DelegationTokenIdentifier> token =
|
||||
@ -541,6 +551,7 @@ public static UserGroupInformation getUGI(ServletContext context,
|
||||
DelegationTokenIdentifier id = new DelegationTokenIdentifier();
|
||||
id.readFields(in);
|
||||
ugi = id.getUser();
|
||||
checkUsername(ugi.getUserName(), user);
|
||||
ugi.addToken(token);
|
||||
ugi.setAuthenticationMethod(AuthenticationMethod.TOKEN);
|
||||
} else {
|
||||
@ -551,16 +562,15 @@ public static UserGroupInformation getUGI(ServletContext context,
|
||||
ugi = UserGroupInformation.createRemoteUser(user);
|
||||
// This is not necessarily true, could have been auth'ed by user-facing
|
||||
// filter
|
||||
ugi.setAuthenticationMethod(AuthenticationMethod.KERBEROS_SSL);
|
||||
ugi.setAuthenticationMethod(secureAuthMethod);
|
||||
}
|
||||
|
||||
checkUsername(user, usernameFromQuery);
|
||||
|
||||
} else { // Security's not on, pull from url
|
||||
String user = request.getParameter("ugi");
|
||||
|
||||
if(user == null) { // not specified in request
|
||||
ugi = getDefaultWebUser(conf);
|
||||
} else {
|
||||
ugi = UserGroupInformation.createRemoteUser(user.split(",")[0]);
|
||||
}
|
||||
ugi = usernameFromQuery == null?
|
||||
getDefaultWebUser(conf) // not specified in request
|
||||
: UserGroupInformation.createRemoteUser(usernameFromQuery);
|
||||
ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE);
|
||||
}
|
||||
|
||||
@ -568,7 +578,28 @@ public static UserGroupInformation getUGI(ServletContext context,
|
||||
LOG.debug("getUGI is returning: " + ugi.getShortUserName());
|
||||
return ugi;
|
||||
}
|
||||
|
||||
|
||||
private static void checkUsername(final String expected, final String name
|
||||
) throws IOException {
|
||||
if (name != null && !name.equals(expected)) {
|
||||
throw new IOException("Usernames not matched: name=" + name
|
||||
+ " != expected=" + expected);
|
||||
}
|
||||
}
|
||||
|
||||
private static String getUsernameFromQuery(final HttpServletRequest request,
|
||||
final boolean tryUgiParameter) {
|
||||
String username = request.getParameter(UserParam.NAME);
|
||||
if (username == null && tryUgiParameter) {
|
||||
//try ugi parameter
|
||||
final String ugiStr = request.getParameter("ugi");
|
||||
if (ugiStr != null) {
|
||||
username = ugiStr.split(",")[0];
|
||||
}
|
||||
}
|
||||
return username;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the url parameter for the given token string.
|
||||
* @param tokenString
|
||||
|
@ -62,6 +62,8 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
@ -549,10 +551,11 @@ conf, new AccessControlList(conf.get(DFS_ADMIN, " ")),
|
||||
this.infoServer.addServlet(null, "/blockScannerReport",
|
||||
DataBlockScanner.Servlet.class);
|
||||
|
||||
infoServer.addJerseyResourcePackage(
|
||||
DatanodeWebHdfsMethods.class.getPackage().getName()
|
||||
+ ";" + Param.class.getPackage().getName(),
|
||||
"/" + WebHdfsFileSystem.PATH_PREFIX + "/*");
|
||||
if (conf.getBoolean(DFS_WEBHDFS_ENABLED_KEY, DFS_WEBHDFS_ENABLED_DEFAULT)) {
|
||||
infoServer.addJerseyResourcePackage(DatanodeWebHdfsMethods.class
|
||||
.getPackage().getName() + ";" + Param.class.getPackage().getName(),
|
||||
"/" + WebHdfsFileSystem.PATH_PREFIX + "/*");
|
||||
}
|
||||
this.infoServer.start();
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import javax.servlet.ServletContext;
|
||||
@ -63,6 +64,7 @@
|
||||
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
/** Web-hdfs DataNode implementation. */
|
||||
@Path("")
|
||||
@ -78,6 +80,7 @@ public class DatanodeWebHdfsMethods {
|
||||
@Produces({MediaType.APPLICATION_JSON})
|
||||
public Response put(
|
||||
final InputStream in,
|
||||
@Context final UserGroupInformation ugi,
|
||||
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
||||
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
|
||||
final PutOpParam op,
|
||||
@ -91,14 +94,18 @@ public Response put(
|
||||
final ReplicationParam replication,
|
||||
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
|
||||
final BlockSizeParam blockSize
|
||||
) throws IOException, URISyntaxException {
|
||||
) throws IOException, URISyntaxException, InterruptedException {
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(op + ": " + path
|
||||
+ Param.toSortedString(", ", permission, overwrite, bufferSize,
|
||||
replication, blockSize));
|
||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||
+ Param.toSortedString(", ", permission, overwrite, bufferSize,
|
||||
replication, blockSize));
|
||||
}
|
||||
|
||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
||||
@Override
|
||||
public Response run() throws IOException, URISyntaxException {
|
||||
|
||||
final String fullpath = path.getAbsolutePath();
|
||||
final DataNode datanode = (DataNode)context.getAttribute("datanode");
|
||||
|
||||
@ -108,14 +115,14 @@ public Response put(
|
||||
final Configuration conf = new Configuration(datanode.getConf());
|
||||
final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
|
||||
final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
|
||||
final int b = bufferSize.getValue(conf);
|
||||
final FSDataOutputStream out = new FSDataOutputStream(dfsclient.create(
|
||||
fullpath, permission.getFsPermission(),
|
||||
overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
|
||||
: EnumSet.of(CreateFlag.CREATE),
|
||||
replication.getValue(), blockSize.getValue(), null,
|
||||
bufferSize.getValue()), null);
|
||||
replication.getValue(), blockSize.getValue(conf), null, b), null);
|
||||
try {
|
||||
IOUtils.copyBytes(in, out, bufferSize.getValue());
|
||||
IOUtils.copyBytes(in, out, b);
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
@ -127,6 +134,8 @@ public Response put(
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/** Handle HTTP POST request. */
|
||||
@ -136,18 +145,23 @@ public Response put(
|
||||
@Produces({MediaType.APPLICATION_JSON})
|
||||
public Response post(
|
||||
final InputStream in,
|
||||
@Context final UserGroupInformation ugi,
|
||||
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
||||
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
|
||||
final PostOpParam op,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize
|
||||
) throws IOException, URISyntaxException {
|
||||
) throws IOException, URISyntaxException, InterruptedException {
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(op + ": " + path
|
||||
+ Param.toSortedString(", ", bufferSize));
|
||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||
+ Param.toSortedString(", ", bufferSize));
|
||||
}
|
||||
|
||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
||||
@Override
|
||||
public Response run() throws IOException {
|
||||
|
||||
final String fullpath = path.getAbsolutePath();
|
||||
final DataNode datanode = (DataNode)context.getAttribute("datanode");
|
||||
|
||||
@ -157,10 +171,10 @@ public Response post(
|
||||
final Configuration conf = new Configuration(datanode.getConf());
|
||||
final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
|
||||
final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
|
||||
final FSDataOutputStream out = dfsclient.append(fullpath,
|
||||
bufferSize.getValue(), null, null);
|
||||
final int b = bufferSize.getValue(conf);
|
||||
final FSDataOutputStream out = dfsclient.append(fullpath, b, null, null);
|
||||
try {
|
||||
IOUtils.copyBytes(in, out, bufferSize.getValue());
|
||||
IOUtils.copyBytes(in, out, b);
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
@ -169,6 +183,8 @@ public Response post(
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/** Handle HTTP GET request. */
|
||||
@ -176,6 +192,7 @@ public Response post(
|
||||
@Path("{" + UriFsPathParam.NAME + ":.*}")
|
||||
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
|
||||
public Response get(
|
||||
@Context final UserGroupInformation ugi,
|
||||
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
||||
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
||||
final GetOpParam op,
|
||||
@ -185,13 +202,17 @@ public Response get(
|
||||
final LengthParam length,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize
|
||||
) throws IOException, URISyntaxException {
|
||||
) throws IOException, URISyntaxException, InterruptedException {
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(op + ": " + path
|
||||
+ Param.toSortedString(", ", offset, length, bufferSize));
|
||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||
+ Param.toSortedString(", ", offset, length, bufferSize));
|
||||
}
|
||||
|
||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
||||
@Override
|
||||
public Response run() throws IOException {
|
||||
|
||||
final String fullpath = path.getAbsolutePath();
|
||||
final DataNode datanode = (DataNode)context.getAttribute("datanode");
|
||||
|
||||
@ -201,8 +222,9 @@ public Response get(
|
||||
final Configuration conf = new Configuration(datanode.getConf());
|
||||
final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
|
||||
final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
|
||||
final int b = bufferSize.getValue(conf);
|
||||
final DFSDataInputStream in = new DFSClient.DFSDataInputStream(
|
||||
dfsclient.open(fullpath, bufferSize.getValue(), true));
|
||||
dfsclient.open(fullpath, b, true));
|
||||
in.seek(offset.getValue());
|
||||
|
||||
final StreamingOutput streaming = new StreamingOutput() {
|
||||
@ -210,7 +232,7 @@ public Response get(
|
||||
public void write(final OutputStream out) throws IOException {
|
||||
final Long n = length.getValue();
|
||||
if (n == null) {
|
||||
IOUtils.copyBytes(in, out, bufferSize.getValue());
|
||||
IOUtils.copyBytes(in, out, b);
|
||||
} else {
|
||||
IOUtils.copyBytes(in, out, n, false);
|
||||
}
|
||||
@ -221,5 +243,7 @@ public void write(final OutputStream out) throws IOException {
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -28,11 +28,9 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
/**
|
||||
* Serve delegation tokens over http for use in hftp.
|
||||
@ -70,18 +68,9 @@ protected void doGet(final HttpServletRequest req, final HttpServletResponse res
|
||||
final DataOutputStream dosFinal = dos; // for doAs block
|
||||
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
String s = NameNode.getAddress(conf).getAddress().getHostAddress()
|
||||
+ ":" + NameNode.getAddress(conf).getPort();
|
||||
|
||||
Token<DelegationTokenIdentifier> token =
|
||||
nn.getRpcServer().getDelegationToken(new Text(renewerFinal));
|
||||
if(token == null) {
|
||||
throw new Exception("couldn't get the token for " +s);
|
||||
}
|
||||
token.setService(new Text(s));
|
||||
Credentials ts = new Credentials();
|
||||
ts.addToken(new Text(ugi.getShortUserName()), token);
|
||||
public Void run() throws IOException {
|
||||
final Credentials ts = DelegationTokenSecretManager.createCredentials(
|
||||
nn, ugi, renewerFinal);
|
||||
ts.write(dosFinal);
|
||||
return null;
|
||||
}
|
||||
|
@ -30,6 +30,7 @@
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||
import org.apache.hadoop.hdfs.web.AuthFilter;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
||||
import org.apache.hadoop.http.HttpServer;
|
||||
@ -99,7 +100,25 @@ public HttpServer run() throws IOException, InterruptedException {
|
||||
int infoPort = bindAddress.getPort();
|
||||
httpServer = new HttpServer("hdfs", infoHost, infoPort,
|
||||
infoPort == 0, conf,
|
||||
new AccessControlList(conf.get(DFSConfigKeys.DFS_ADMIN, " ")));
|
||||
new AccessControlList(conf.get(DFSConfigKeys.DFS_ADMIN, " "))) {
|
||||
{
|
||||
if (conf.getBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT)) {
|
||||
//add SPNEGO authentication filter for webhdfs
|
||||
final String name = "SPNEGO";
|
||||
final String classname = AuthFilter.class.getName();
|
||||
final String pathSpec = "/" + WebHdfsFileSystem.PATH_PREFIX + "/*";
|
||||
defineFilter(webAppContext, name, classname, null,
|
||||
new String[]{pathSpec});
|
||||
LOG.info("Added filter '" + name + "' (class=" + classname + ")");
|
||||
|
||||
// add webhdfs packages
|
||||
addJerseyResourcePackage(
|
||||
NamenodeWebHdfsMethods.class.getPackage().getName()
|
||||
+ ";" + Param.class.getPackage().getName(), pathSpec);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
boolean certSSL = conf.getBoolean("dfs.https.enable", false);
|
||||
boolean useKrb = UserGroupInformation.isSecurityEnabled();
|
||||
@ -128,7 +147,7 @@ public HttpServer run() throws IOException, InterruptedException {
|
||||
nn.getNameNodeAddress());
|
||||
httpServer.setAttribute(FSIMAGE_ATTRIBUTE_KEY, nn.getFSImage());
|
||||
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
|
||||
setupServlets(httpServer);
|
||||
setupServlets(httpServer, conf);
|
||||
httpServer.start();
|
||||
|
||||
// The web-server port can be ephemeral... ensure we have the correct
|
||||
@ -159,7 +178,7 @@ public InetSocketAddress getHttpAddress() {
|
||||
return httpAddress;
|
||||
}
|
||||
|
||||
private static void setupServlets(HttpServer httpServer) {
|
||||
private static void setupServlets(HttpServer httpServer, Configuration conf) {
|
||||
httpServer.addInternalServlet("getDelegationToken",
|
||||
GetDelegationTokenServlet.PATH_SPEC,
|
||||
GetDelegationTokenServlet.class, true);
|
||||
@ -181,11 +200,6 @@ private static void setupServlets(HttpServer httpServer) {
|
||||
FileChecksumServlets.RedirectServlet.class, false);
|
||||
httpServer.addInternalServlet("contentSummary", "/contentSummary/*",
|
||||
ContentSummaryServlet.class, false);
|
||||
|
||||
httpServer.addJerseyResourcePackage(
|
||||
NamenodeWebHdfsMethods.class.getPackage().getName()
|
||||
+ ";" + Param.class.getPackage().getName(),
|
||||
"/" + WebHdfsFileSystem.PATH_PREFIX + "/*");
|
||||
}
|
||||
|
||||
public static FSImage getFsImageFromContext(ServletContext context) {
|
||||
|
@ -24,9 +24,11 @@
|
||||
import java.io.PrintStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import javax.servlet.ServletContext;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
@ -49,6 +51,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
@ -58,6 +61,7 @@
|
||||
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DstPathParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||
@ -76,7 +80,14 @@
|
||||
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
|
||||
/** Web-hdfs NameNode implementation. */
|
||||
@Path("")
|
||||
@ -84,6 +95,7 @@ public class NamenodeWebHdfsMethods {
|
||||
private static final Log LOG = LogFactory.getLog(NamenodeWebHdfsMethods.class);
|
||||
|
||||
private @Context ServletContext context;
|
||||
private @Context HttpServletRequest request;
|
||||
|
||||
private static DatanodeInfo chooseDatanode(final NameNode namenode,
|
||||
final String path, final HttpOpParam.Op op, final long openOffset
|
||||
@ -112,11 +124,40 @@ private static DatanodeInfo chooseDatanode(final NameNode namenode,
|
||||
NodeBase.ROOT);
|
||||
}
|
||||
|
||||
private static URI redirectURI(final NameNode namenode,
|
||||
private Token<? extends TokenIdentifier> generateDelegationToken(
|
||||
final NameNode namenode, final UserGroupInformation ugi,
|
||||
final String renewer) throws IOException {
|
||||
final Credentials c = DelegationTokenSecretManager.createCredentials(
|
||||
namenode, ugi, request.getUserPrincipal().getName());
|
||||
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
|
||||
t.setService(new Text(SecurityUtil.buildDTServiceName(
|
||||
NameNode.getUri(namenode.getNameNodeAddress()),
|
||||
NameNode.DEFAULT_PORT)));
|
||||
return t;
|
||||
}
|
||||
|
||||
private URI redirectURI(final NameNode namenode,
|
||||
final UserGroupInformation ugi, final DelegationParam delegation,
|
||||
final String path, final HttpOpParam.Op op, final long openOffset,
|
||||
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
|
||||
final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset);
|
||||
final String query = op.toQueryString() + Param.toSortedString("&", parameters);
|
||||
|
||||
final String delegationQuery;
|
||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
||||
//security disabled
|
||||
delegationQuery = "";
|
||||
} else if (delegation.getValue() != null) {
|
||||
//client has provided a token
|
||||
delegationQuery = "&" + delegation;
|
||||
} else {
|
||||
//generate a token
|
||||
final Token<? extends TokenIdentifier> t = generateDelegationToken(
|
||||
namenode, ugi, request.getUserPrincipal().getName());
|
||||
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
|
||||
}
|
||||
final String query = op.toQueryString()
|
||||
+ '&' + new UserParam(ugi) + delegationQuery
|
||||
+ Param.toSortedString("&", parameters);
|
||||
final String uripath = "/" + WebHdfsFileSystem.PATH_PREFIX + path;
|
||||
|
||||
final URI uri = new URI("http", null, dn.getHostName(), dn.getInfoPort(),
|
||||
@ -134,6 +175,9 @@ private static URI redirectURI(final NameNode namenode,
|
||||
@Produces({MediaType.APPLICATION_JSON})
|
||||
public Response put(
|
||||
final InputStream in,
|
||||
@Context final UserGroupInformation ugi,
|
||||
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
||||
final DelegationParam delegation,
|
||||
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
||||
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
|
||||
final PutOpParam op,
|
||||
@ -159,15 +203,19 @@ public Response put(
|
||||
final AccessTimeParam accessTime,
|
||||
@QueryParam(RenameOptionSetParam.NAME) @DefaultValue(RenameOptionSetParam.DEFAULT)
|
||||
final RenameOptionSetParam renameOptions
|
||||
) throws IOException, URISyntaxException {
|
||||
) throws IOException, URISyntaxException, InterruptedException {
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(op + ": " + path
|
||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||
+ Param.toSortedString(", ", dstPath, owner, group, permission,
|
||||
overwrite, bufferSize, replication, blockSize,
|
||||
modificationTime, accessTime, renameOptions));
|
||||
}
|
||||
|
||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
||||
@Override
|
||||
public Response run() throws IOException, URISyntaxException {
|
||||
|
||||
final String fullpath = path.getAbsolutePath();
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final NamenodeProtocols np = namenode.getRpcServer();
|
||||
@ -175,7 +223,8 @@ public Response put(
|
||||
switch(op.getValue()) {
|
||||
case CREATE:
|
||||
{
|
||||
final URI uri = redirectURI(namenode, fullpath, op.getValue(), -1L,
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, fullpath,
|
||||
op.getValue(), -1L,
|
||||
permission, overwrite, bufferSize, replication, blockSize);
|
||||
return Response.temporaryRedirect(uri).build();
|
||||
}
|
||||
@ -223,6 +272,8 @@ public Response put(
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/** Handle HTTP POST request. */
|
||||
@ -232,31 +283,40 @@ public Response put(
|
||||
@Produces({MediaType.APPLICATION_JSON})
|
||||
public Response post(
|
||||
final InputStream in,
|
||||
@Context final UserGroupInformation ugi,
|
||||
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
||||
final DelegationParam delegation,
|
||||
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
||||
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
|
||||
final PostOpParam op,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize
|
||||
) throws IOException, URISyntaxException {
|
||||
) throws IOException, URISyntaxException, InterruptedException {
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(op + ": " + path
|
||||
+ Param.toSortedString(", ", bufferSize));
|
||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||
+ Param.toSortedString(", ", bufferSize));
|
||||
}
|
||||
|
||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
||||
@Override
|
||||
public Response run() throws IOException, URISyntaxException {
|
||||
|
||||
final String fullpath = path.getAbsolutePath();
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
|
||||
switch(op.getValue()) {
|
||||
case APPEND:
|
||||
{
|
||||
final URI uri = redirectURI(namenode, fullpath, op.getValue(), -1L,
|
||||
bufferSize);
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, fullpath,
|
||||
op.getValue(), -1L, bufferSize);
|
||||
return Response.temporaryRedirect(uri).build();
|
||||
}
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static final UriFsPathParam ROOT = new UriFsPathParam("");
|
||||
@ -266,6 +326,9 @@ public Response post(
|
||||
@Path("/")
|
||||
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
|
||||
public Response root(
|
||||
@Context final UserGroupInformation ugi,
|
||||
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
||||
final DelegationParam delegation,
|
||||
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
||||
final GetOpParam op,
|
||||
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
|
||||
@ -274,8 +337,8 @@ public Response root(
|
||||
final LengthParam length,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize
|
||||
) throws IOException, URISyntaxException {
|
||||
return get(ROOT, op, offset, length, bufferSize);
|
||||
) throws IOException, URISyntaxException, InterruptedException {
|
||||
return get(ugi, delegation, ROOT, op, offset, length, bufferSize);
|
||||
}
|
||||
|
||||
/** Handle HTTP GET request. */
|
||||
@ -283,6 +346,9 @@ public Response root(
|
||||
@Path("{" + UriFsPathParam.NAME + ":.*}")
|
||||
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
|
||||
public Response get(
|
||||
@Context final UserGroupInformation ugi,
|
||||
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
||||
final DelegationParam delegation,
|
||||
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
||||
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
||||
final GetOpParam op,
|
||||
@ -292,13 +358,18 @@ public Response get(
|
||||
final LengthParam length,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize
|
||||
) throws IOException, URISyntaxException {
|
||||
) throws IOException, URISyntaxException, InterruptedException {
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(op + ", " + path
|
||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||
+ Param.toSortedString(", ", offset, length, bufferSize));
|
||||
}
|
||||
|
||||
|
||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
||||
@Override
|
||||
public Response run() throws IOException, URISyntaxException {
|
||||
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final String fullpath = path.getAbsolutePath();
|
||||
final NamenodeProtocols np = namenode.getRpcServer();
|
||||
@ -306,8 +377,8 @@ public Response get(
|
||||
switch(op.getValue()) {
|
||||
case OPEN:
|
||||
{
|
||||
final URI uri = redirectURI(namenode, fullpath, op.getValue(),
|
||||
offset.getValue(), offset, length, bufferSize);
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, fullpath,
|
||||
op.getValue(), offset.getValue(), offset, length, bufferSize);
|
||||
return Response.temporaryRedirect(uri).build();
|
||||
}
|
||||
case GETFILESTATUS:
|
||||
@ -324,6 +395,8 @@ public Response get(
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static DirectoryListing getDirectoryListing(final NamenodeProtocols np,
|
||||
@ -373,28 +446,36 @@ public void write(final OutputStream outstream) throws IOException {
|
||||
@Path("{path:.*}")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Response delete(
|
||||
@Context final UserGroupInformation ugi,
|
||||
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
||||
@QueryParam(DeleteOpParam.NAME) @DefaultValue(DeleteOpParam.DEFAULT)
|
||||
final DeleteOpParam op,
|
||||
@QueryParam(RecursiveParam.NAME) @DefaultValue(RecursiveParam.DEFAULT)
|
||||
final RecursiveParam recursive
|
||||
) throws IOException {
|
||||
) throws IOException, InterruptedException {
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(op + ", " + path
|
||||
+ Param.toSortedString(", ", recursive));
|
||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||
+ Param.toSortedString(", ", recursive));
|
||||
}
|
||||
|
||||
switch(op.getValue()) {
|
||||
case DELETE:
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final String fullpath = path.getAbsolutePath();
|
||||
final boolean b = namenode.getRpcServer().delete(fullpath, recursive.getValue());
|
||||
final String js = JsonUtil.toJsonString(DeleteOpParam.Op.DELETE, b);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
||||
@Override
|
||||
public Response run() throws IOException {
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final String fullpath = path.getAbsolutePath();
|
||||
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
switch(op.getValue()) {
|
||||
case DELETE:
|
||||
{
|
||||
final boolean b = namenode.getRpcServer().delete(fullpath, recursive.getValue());
|
||||
final String js = JsonUtil.toJsonString(DeleteOpParam.Op.DELETE, b);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,70 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import javax.servlet.FilterConfig;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
||||
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
|
||||
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
|
||||
|
||||
/**
|
||||
* Subclass of {@link AuthenticationFilter} that
|
||||
* obtains Hadoop-Auth configuration for webhdfs.
|
||||
*/
|
||||
public class AuthFilter extends AuthenticationFilter {
|
||||
private static final String CONF_PREFIX = "dfs.web.authentication.";
|
||||
|
||||
/**
|
||||
* Returns the filter configuration properties,
|
||||
* including the ones prefixed with {@link #CONF_PREFIX}.
|
||||
* The prefix is removed from the returned property names.
|
||||
*
|
||||
* @param prefix parameter not used.
|
||||
* @param config parameter not used.
|
||||
* @return Hadoop-Auth configuration properties.
|
||||
*/
|
||||
@Override
|
||||
protected Properties getConfiguration(String prefix, FilterConfig config) {
|
||||
final Configuration conf = new Configuration();
|
||||
final Properties p = new Properties();
|
||||
|
||||
//set authentication type
|
||||
p.setProperty(AUTH_TYPE, UserGroupInformation.isSecurityEnabled()?
|
||||
KerberosAuthenticationHandler.TYPE: PseudoAuthenticationHandler.TYPE);
|
||||
//For Pseudo Authentication, allow anonymous.
|
||||
p.setProperty(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "true");
|
||||
//set cookie path
|
||||
p.setProperty(COOKIE_PATH, "/");
|
||||
|
||||
//set other configurations with CONF_PREFIX
|
||||
for (Map.Entry<String, String> entry : conf) {
|
||||
final String key = entry.getKey();
|
||||
if (key.startsWith(CONF_PREFIX)) {
|
||||
//remove prefix from the key and set property
|
||||
p.setProperty(key.substring(CONF_PREFIX.length()), conf.get(key));
|
||||
}
|
||||
}
|
||||
return p;
|
||||
}
|
||||
}
|
@ -0,0 +1,45 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.Authenticator;
|
||||
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
|
||||
import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
|
||||
|
||||
/**
|
||||
* Use UserGroupInformation as a fallback authenticator
|
||||
* if the server does not use Kerberos SPNEGO HTTP authentication.
|
||||
*/
|
||||
public class KerberosUgiAuthenticator extends KerberosAuthenticator {
|
||||
@Override
|
||||
protected Authenticator getFallBackAuthenticator() {
|
||||
return new PseudoAuthenticator() {
|
||||
@Override
|
||||
protected String getUserName() {
|
||||
try {
|
||||
return UserGroupInformation.getLoginUser().getUserName();
|
||||
} catch (IOException e) {
|
||||
throw new SecurityException("Failed to obtain current username", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -64,9 +64,12 @@
|
||||
import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
|
||||
@ -77,11 +80,15 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
||||
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
|
||||
public static final String PATH_PREFIX = SCHEME;
|
||||
|
||||
private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator();
|
||||
|
||||
private UserGroupInformation ugi;
|
||||
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
|
||||
protected Path workingDir;
|
||||
|
||||
@Override
|
||||
public void initialize(URI uri, Configuration conf) throws IOException {
|
||||
public synchronized void initialize(URI uri, Configuration conf
|
||||
) throws IOException {
|
||||
super.initialize(uri, conf);
|
||||
setConf(conf);
|
||||
|
||||
@ -162,8 +169,9 @@ private URL toUrl(final HttpOpParam.Op op, final Path fspath,
|
||||
final String path = "/" + PATH_PREFIX
|
||||
+ makeQualified(fspath).toUri().getPath();
|
||||
final String query = op.toQueryString()
|
||||
+ '&' + new UserParam(ugi)
|
||||
+ Param.toSortedString("&", parameters);
|
||||
final URL url = getNamenodeURL(path, query);
|
||||
final URL url = getNamenodeURL(path, addDelegationTokenParam(query));
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("url=" + url);
|
||||
}
|
||||
@ -175,7 +183,12 @@ private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath
|
||||
final URL url = toUrl(op, fspath, parameters);
|
||||
|
||||
//connect and get response
|
||||
final HttpURLConnection conn = (HttpURLConnection)url.openConnection();
|
||||
final HttpURLConnection conn;
|
||||
try {
|
||||
conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
|
||||
} catch(AuthenticationException e) {
|
||||
throw new IOException("Authentication failed, url=" + url, e);
|
||||
}
|
||||
try {
|
||||
conn.setRequestMethod(op.getType().toString());
|
||||
conn.setDoOutput(op.getDoOutput());
|
||||
|
@ -17,6 +17,11 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.resources;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/** Block size parameter. */
|
||||
public class BlockSizeParam extends LongParam {
|
||||
/** Parameter name. */
|
||||
@ -46,4 +51,10 @@ public BlockSizeParam(final String str) {
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
/** @return the value or, if it is null, return the default from conf. */
|
||||
public long getValue(final Configuration conf) {
|
||||
return getValue() != null? getValue()
|
||||
: conf.getLong(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT);
|
||||
}
|
||||
}
|
@ -17,6 +17,9 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.resources;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
|
||||
/** Buffer size parameter. */
|
||||
public class BufferSizeParam extends IntegerParam {
|
||||
/** Parameter name. */
|
||||
@ -46,4 +49,12 @@ public BufferSizeParam(final String str) {
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
/** @return the value or, if it is null, return the default from conf. */
|
||||
public int getValue(final Configuration conf) {
|
||||
return getValue() != null? getValue()
|
||||
: conf.getInt(
|
||||
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
|
||||
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||
}
|
||||
}
|
@ -0,0 +1,45 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.resources;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
/** Delegation token parameter. */
|
||||
public class DelegationParam extends StringParam {
|
||||
/** Parameter name. */
|
||||
public static final String NAME = JspHelper.DELEGATION_PARAMETER_NAME;
|
||||
/** Default parameter value. */
|
||||
public static final String DEFAULT = "";
|
||||
|
||||
private static final Domain DOMAIN = new Domain(NAME, null);
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param str a string representation of the parameter value.
|
||||
*/
|
||||
public DelegationParam(final String str) {
|
||||
super(DOMAIN, UserGroupInformation.isSecurityEnabled()
|
||||
&& str != null && !str.equals(DEFAULT)? str: null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
}
|
@ -18,7 +18,8 @@
|
||||
package org.apache.hadoop.hdfs.web.resources;
|
||||
|
||||
/** Http operation parameter. */
|
||||
public abstract class HttpOpParam<E extends Enum<E> & HttpOpParam.Op> extends EnumParam<E> {
|
||||
public abstract class HttpOpParam<E extends Enum<E> & HttpOpParam.Op>
|
||||
extends EnumParam<E> {
|
||||
/** Default parameter value. */
|
||||
public static final String DEFAULT = NULL;
|
||||
|
||||
@ -32,20 +33,16 @@ public static interface Op {
|
||||
/** @return the Http operation type. */
|
||||
public Type getType();
|
||||
|
||||
/** @return true if the operation has output. */
|
||||
/** @return true if the operation will do output. */
|
||||
public boolean getDoOutput();
|
||||
|
||||
/** @return true if the operation has output. */
|
||||
/** @return true the expected http response code. */
|
||||
public int getExpectedHttpResponseCode();
|
||||
|
||||
/** @return a URI query string. */
|
||||
public String toQueryString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param str a string representation of the parameter value.
|
||||
*/
|
||||
HttpOpParam(final Domain<E> domain, final E value) {
|
||||
super(domain, value);
|
||||
}
|
||||
|
@ -17,6 +17,8 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.resources;
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
/** User parameter. */
|
||||
public class UserParam extends StringParam {
|
||||
/** Parameter name. */
|
||||
@ -34,6 +36,13 @@ public UserParam(final String str) {
|
||||
super(DOMAIN, str == null || str.equals(DEFAULT)? null: str);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct an object from a UGI.
|
||||
*/
|
||||
public UserParam(final UserGroupInformation ugi) {
|
||||
this(ugi.getShortUserName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return NAME;
|
||||
|
@ -17,12 +17,18 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.resources;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Type;
|
||||
import java.security.Principal;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.ext.Provider;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||
|
||||
import com.sun.jersey.api.core.HttpContext;
|
||||
import com.sun.jersey.core.spi.component.ComponentContext;
|
||||
import com.sun.jersey.core.spi.component.ComponentScope;
|
||||
@ -30,33 +36,23 @@
|
||||
import com.sun.jersey.spi.inject.Injectable;
|
||||
import com.sun.jersey.spi.inject.InjectableProvider;
|
||||
|
||||
/** Inject user information to http operations. */
|
||||
@Provider
|
||||
public class UserProvider extends AbstractHttpContextInjectable<Principal>
|
||||
public class UserProvider
|
||||
extends AbstractHttpContextInjectable<UserGroupInformation>
|
||||
implements InjectableProvider<Context, Type> {
|
||||
@Context HttpServletRequest request;
|
||||
|
||||
@Override
|
||||
public Principal getValue(final HttpContext context) {
|
||||
//get principal from the request
|
||||
final Principal principal = context.getRequest().getUserPrincipal();
|
||||
if (principal != null) {
|
||||
return principal;
|
||||
public UserGroupInformation getValue(final HttpContext context) {
|
||||
final Configuration conf = (Configuration)context.getProperties().get(
|
||||
JspHelper.CURRENT_CONF);
|
||||
try {
|
||||
return JspHelper.getUGI(null, request, conf,
|
||||
AuthenticationMethod.KERBEROS, false);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
//get username from the parameter
|
||||
final String username = context.getRequest().getQueryParameters().getFirst(
|
||||
UserParam.NAME);
|
||||
if (username != null) {
|
||||
final UserParam userparam = new UserParam(username);
|
||||
return new Principal() {
|
||||
@Override
|
||||
public String getName() {
|
||||
return userparam.getValue();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
//user not found
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -65,9 +61,9 @@ public ComponentScope getScope() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Injectable<Principal> getInjectable(
|
||||
public Injectable<UserGroupInformation> getInjectable(
|
||||
final ComponentContext componentContext, final Context context,
|
||||
final Type type) {
|
||||
return type.equals(Principal.class)? this : null;
|
||||
return type.equals(UserGroupInformation.class)? this : null;
|
||||
}
|
||||
}
|
@ -683,4 +683,24 @@ creations/deletions), or "all".</description>
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.web.authentication.kerberos.principal</name>
|
||||
<value>HTTP/${dfs.web.hostname}@${kerberos.realm}</value>
|
||||
<description>
|
||||
The HTTP Kerberos principal used by Hadoop-Auth in the HTTP endpoint.
|
||||
|
||||
The HTTP Kerberos principal MUST start with 'HTTP/' per Kerberos
|
||||
HTTP SPENGO specification.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.web.authentication.kerberos.keytab</name>
|
||||
<value>${user.home}/dfs.web.keytab</value>
|
||||
<description>
|
||||
The Kerberos keytab file with the credentials for the
|
||||
HTTP Kerberos principal used by Hadoop-Auth in the HTTP endpoint.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
@ -17,19 +17,28 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import static org.apache.hadoop.fs.FileSystemTestHelper.exists;
|
||||
import static org.apache.hadoop.fs.FileSystemTestHelper.getTestRootPath;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSMainOperationsBaseTest;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
|
||||
import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -44,14 +53,30 @@ public class TestFSMainOperationsWebHdfs extends FSMainOperationsBaseTest {
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() {
|
||||
Configuration conf = new Configuration();
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||
cluster.waitActive();
|
||||
|
||||
//change root permission to 777
|
||||
cluster.getFileSystem().setPermission(
|
||||
new Path("/"), new FsPermission((short)0777));
|
||||
|
||||
final String uri = WebHdfsFileSystem.SCHEME + "://"
|
||||
+ conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
fSys = FileSystem.get(new URI(uri), conf);
|
||||
|
||||
//get file system as a non-superuser
|
||||
final UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
||||
final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
|
||||
current.getShortUserName() + "x", new String[]{"user"});
|
||||
fSys = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||
@Override
|
||||
public FileSystem run() throws Exception {
|
||||
return FileSystem.get(new URI(uri), conf);
|
||||
}
|
||||
});
|
||||
|
||||
defaultWorkingDirectory = fSys.getWorkingDirectory();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
@ -70,10 +95,40 @@ public static void shutdownCluster() {
|
||||
protected Path getDefaultWorkingDirectory() {
|
||||
return defaultWorkingDirectory;
|
||||
}
|
||||
|
||||
//The following test failed since WebHdfsFileSystem did not support
|
||||
//authentication.
|
||||
//Disable it.
|
||||
|
||||
@Test
|
||||
public void testListStatusThrowsExceptionForUnreadableDir() {}
|
||||
public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {
|
||||
Path testDir = getTestRootPath(fSys, "test/hadoop");
|
||||
Assert.assertFalse(exists(fSys, testDir));
|
||||
fSys.mkdirs(testDir);
|
||||
Assert.assertTrue(exists(fSys, testDir));
|
||||
|
||||
createFile(getTestRootPath(fSys, "test/hadoop/file"));
|
||||
|
||||
Path testSubDir = getTestRootPath(fSys, "test/hadoop/file/subdir");
|
||||
try {
|
||||
fSys.mkdirs(testSubDir);
|
||||
Assert.fail("Should throw IOException.");
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
}
|
||||
try {
|
||||
Assert.assertFalse(exists(fSys, testSubDir));
|
||||
} catch(AccessControlException e) {
|
||||
// also okay for HDFS.
|
||||
}
|
||||
|
||||
Path testDeepSubDir = getTestRootPath(fSys, "test/hadoop/file/deep/sub/dir");
|
||||
try {
|
||||
fSys.mkdirs(testDeepSubDir);
|
||||
Assert.fail("Should throw IOException.");
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
}
|
||||
try {
|
||||
Assert.assertFalse(exists(fSys, testDeepSubDir));
|
||||
} catch(AccessControlException e) {
|
||||
// also okay for HDFS.
|
||||
}
|
||||
}
|
||||
}
|
@ -20,12 +20,17 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
||||
private static final Configuration conf = new Configuration();
|
||||
@ -33,9 +38,14 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
||||
private String defaultWorkingDirectory;
|
||||
|
||||
static {
|
||||
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||
cluster.waitActive();
|
||||
|
||||
//change root permission to 777
|
||||
cluster.getFileSystem().setPermission(
|
||||
new Path("/"), new FsPermission((short)0777));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@ -45,7 +55,18 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
||||
protected void setUp() throws Exception {
|
||||
final String uri = WebHdfsFileSystem.SCHEME + "://"
|
||||
+ conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
fs = FileSystem.get(new URI(uri), conf);
|
||||
|
||||
//get file system as a non-superuser
|
||||
final UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
||||
final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
|
||||
current.getShortUserName() + "x", new String[]{"user"});
|
||||
fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||
@Override
|
||||
public FileSystem run() throws Exception {
|
||||
return FileSystem.get(new URI(uri), conf);
|
||||
}
|
||||
});
|
||||
|
||||
defaultWorkingDirectory = fs.getWorkingDirectory().toUri().getPath();
|
||||
}
|
||||
|
||||
@ -53,4 +74,44 @@ protected void setUp() throws Exception {
|
||||
protected String getDefaultWorkingDirectory() {
|
||||
return defaultWorkingDirectory;
|
||||
}
|
||||
|
||||
/** HDFS throws AccessControlException
|
||||
* when calling exist(..) on a path /foo/bar/file
|
||||
* but /foo/bar is indeed a file in HDFS.
|
||||
*/
|
||||
@Override
|
||||
public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {
|
||||
Path testDir = path("/test/hadoop");
|
||||
assertFalse(fs.exists(testDir));
|
||||
assertTrue(fs.mkdirs(testDir));
|
||||
assertTrue(fs.exists(testDir));
|
||||
|
||||
createFile(path("/test/hadoop/file"));
|
||||
|
||||
Path testSubDir = path("/test/hadoop/file/subdir");
|
||||
try {
|
||||
fs.mkdirs(testSubDir);
|
||||
fail("Should throw IOException.");
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
}
|
||||
try {
|
||||
assertFalse(fs.exists(testSubDir));
|
||||
} catch(AccessControlException e) {
|
||||
// also okay for HDFS.
|
||||
}
|
||||
|
||||
Path testDeepSubDir = path("/test/hadoop/file/deep/sub/dir");
|
||||
try {
|
||||
fs.mkdirs(testDeepSubDir);
|
||||
fail("Should throw IOException.");
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
}
|
||||
try {
|
||||
assertFalse(fs.exists(testDeepSubDir));
|
||||
} catch(AccessControlException e) {
|
||||
// also okay for HDFS.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1343,6 +1343,9 @@ Release 0.22.0 - Unreleased
|
||||
MAPREDUCE-1664. Changes the behaviour of the combination of job-acls
|
||||
when they function together with queue-acls. (Ravi Gummadi via vinodkv)
|
||||
|
||||
MAPREDUCE-2994. Fixed a bug in ApplicationID parsing that affects RM
|
||||
UI. (Devaraj K via vinodkv)
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708.
|
||||
|
@ -30,7 +30,7 @@
|
||||
* Yarn application related utilities
|
||||
*/
|
||||
public class Apps {
|
||||
public static final String APP = "app";
|
||||
public static final String APP = "application";
|
||||
public static final String ID = "ID";
|
||||
|
||||
public static ApplicationId toAppID(String aid) {
|
||||
|
Loading…
Reference in New Issue
Block a user