HDFS-5570. Deprecate hftp / hsftp and replace them with webhdfs / swebhdfs. Contributed by Haohui Mai.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1584100 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
64c50d9dfb
commit
fb1d7fb596
@ -162,7 +162,7 @@ public synchronized Text getKind() {
|
||||
|
||||
/**
|
||||
* Set the token kind. This is only intended to be used by services that
|
||||
* wrap another service's token, such as HFTP wrapping HDFS.
|
||||
* wrap another service's token.
|
||||
* @param newKind
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
|
@ -9,6 +9,9 @@ Trunk (Unreleased)
|
||||
HDFS-5079. Cleaning up NNHAStatusHeartbeat.State from
|
||||
DatanodeProtocolProtos. (Tao Luo via shv)
|
||||
|
||||
HDFS-5570. Deprecate hftp / hsftp and replace them with webhdfs / swebhdfs.
|
||||
(wheat9)
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
|
||||
|
@ -41,7 +41,7 @@ public class DelegationTokenSelector
|
||||
* Select the delegation token for hdfs. The port will be rewritten to
|
||||
* the port of hdfs.service.host_$nnAddr, or the default rpc namenode port.
|
||||
* This method should only be called by non-hdfs filesystems that do not
|
||||
* use the rpc port to acquire tokens. Ex. webhdfs, hftp
|
||||
* use the rpc port to acquire tokens. Ex. webhdfs
|
||||
* @param nnUri of the remote namenode
|
||||
* @param tokens as a collection
|
||||
* @param conf hadoop configuration
|
||||
|
@ -58,8 +58,6 @@
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
|
||||
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.*;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
@ -361,10 +359,6 @@ private void startInfoServer(Configuration conf) throws IOException {
|
||||
|
||||
this.infoServer = builder.build();
|
||||
|
||||
this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
|
||||
this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
|
||||
FileChecksumServlets.GetServlet.class);
|
||||
|
||||
this.infoServer.setAttribute("datanode", this);
|
||||
this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
|
||||
this.infoServer.addServlet(null, "/blockScannerReport",
|
||||
|
@ -781,7 +781,7 @@ public InetSocketAddress getServiceRpcAddress() {
|
||||
|
||||
/**
|
||||
* @return NameNode HTTP address, used by the Web UI, image transfer,
|
||||
* and HTTP-based file system clients like Hftp and WebHDFS
|
||||
* and HTTP-based file system clients like WebHDFS
|
||||
*/
|
||||
public InetSocketAddress getHttpAddress() {
|
||||
return httpServer.getHttpAddress();
|
||||
@ -789,7 +789,7 @@ public InetSocketAddress getHttpAddress() {
|
||||
|
||||
/**
|
||||
* @return NameNode HTTPS address, used by the Web UI, image transfer,
|
||||
* and HTTP-based file system clients like Hftp and WebHDFS
|
||||
* and HTTP-based file system clients like WebHDFS
|
||||
*/
|
||||
public InetSocketAddress getHttpsAddress() {
|
||||
return httpServer.getHttpsAddress();
|
||||
|
@ -229,27 +229,10 @@ void setStartupProgress(StartupProgress prog) {
|
||||
private static void setupServlets(HttpServer2 httpServer, Configuration conf) {
|
||||
httpServer.addInternalServlet("startupProgress",
|
||||
StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class);
|
||||
httpServer.addInternalServlet("getDelegationToken",
|
||||
GetDelegationTokenServlet.PATH_SPEC,
|
||||
GetDelegationTokenServlet.class, true);
|
||||
httpServer.addInternalServlet("renewDelegationToken",
|
||||
RenewDelegationTokenServlet.PATH_SPEC,
|
||||
RenewDelegationTokenServlet.class, true);
|
||||
httpServer.addInternalServlet("cancelDelegationToken",
|
||||
CancelDelegationTokenServlet.PATH_SPEC,
|
||||
CancelDelegationTokenServlet.class, true);
|
||||
httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
|
||||
true);
|
||||
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
|
||||
ImageServlet.class, true);
|
||||
httpServer.addInternalServlet("listPaths", "/listPaths/*",
|
||||
ListPathsServlet.class, false);
|
||||
httpServer.addInternalServlet("data", "/data/*",
|
||||
FileDataServlet.class, false);
|
||||
httpServer.addInternalServlet("checksum", "/fileChecksum/*",
|
||||
FileChecksumServlets.RedirectServlet.class, false);
|
||||
httpServer.addInternalServlet("contentSummary", "/contentSummary/*",
|
||||
ContentSummaryServlet.class, false);
|
||||
}
|
||||
|
||||
static FSImage getFsImageFromContext(ServletContext context) {
|
||||
|
@ -17,17 +17,11 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.tools;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
@ -43,23 +37,16 @@
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
|
||||
import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
|
||||
import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.HsftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
||||
import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Fetch a DelegationToken from the current Namenode and store it in the
|
||||
@ -67,37 +54,15 @@
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DelegationTokenFetcher {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(DelegationTokenFetcher.class);
|
||||
private static final String WEBSERVICE = "webservice";
|
||||
private static final String RENEWER = "renewer";
|
||||
private static final String CANCEL = "cancel";
|
||||
private static final String RENEW = "renew";
|
||||
private static final String PRINT = "print";
|
||||
private static final String HELP = "help";
|
||||
private static final String HELP_SHORT = "h";
|
||||
|
||||
private static void printUsage(PrintStream err) {
|
||||
err.println("fetchdt retrieves delegation tokens from the NameNode");
|
||||
err.println();
|
||||
err.println("fetchdt <opts> <token file>");
|
||||
err.println("Options:");
|
||||
err.println(" --webservice <url> Url to contact NN on");
|
||||
err.println(" --renewer <name> Name of the delegation token renewer");
|
||||
err.println(" --cancel Cancel the delegation token");
|
||||
err.println(" --renew Renew the delegation token. Delegation "
|
||||
+ "token must have been fetched using the --renewer <name> option.");
|
||||
err.println(" --print Print the delegation token");
|
||||
err.println();
|
||||
GenericOptionsParser.printGenericCommandUsage(err);
|
||||
ExitUtil.terminate(1);
|
||||
}
|
||||
|
||||
private static Collection<Token<?>> readTokens(Path file, Configuration conf)
|
||||
throws IOException {
|
||||
Credentials creds = Credentials.readTokenStorageFile(file, conf);
|
||||
return creds.getAllTokens();
|
||||
}
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(DelegationTokenFetcher.class);
|
||||
private static final String PRINT = "print";
|
||||
private static final String RENEW = "renew";
|
||||
private static final String RENEWER = "renewer";
|
||||
|
||||
/**
|
||||
* Command-line interface
|
||||
@ -105,23 +70,22 @@ private static Collection<Token<?>> readTokens(Path file, Configuration conf)
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
Options fetcherOptions = new Options();
|
||||
fetcherOptions.addOption(WEBSERVICE, true,
|
||||
"HTTP url to reach the NameNode at");
|
||||
fetcherOptions.addOption(RENEWER, true,
|
||||
"Name of the delegation token renewer");
|
||||
fetcherOptions.addOption(CANCEL, false, "cancel the token");
|
||||
fetcherOptions.addOption(RENEW, false, "renew the token");
|
||||
fetcherOptions.addOption(PRINT, false, "print the token");
|
||||
fetcherOptions.addOption(HELP_SHORT, HELP, false, "print out help information");
|
||||
fetcherOptions
|
||||
.addOption(WEBSERVICE, true, "HTTP url to reach the NameNode at")
|
||||
.addOption(RENEWER, true, "Name of the delegation token renewer")
|
||||
.addOption(CANCEL, false, "cancel the token")
|
||||
.addOption(RENEW, false, "renew the token")
|
||||
.addOption(PRINT, false, "print the token")
|
||||
.addOption(HELP_SHORT, HELP, false, "print out help information");
|
||||
|
||||
GenericOptionsParser parser = new GenericOptionsParser(conf,
|
||||
fetcherOptions, args);
|
||||
CommandLine cmd = parser.getCommandLine();
|
||||
|
||||
// get options
|
||||
final String webUrl = cmd.hasOption(WEBSERVICE) ? cmd
|
||||
.getOptionValue(WEBSERVICE) : null;
|
||||
final String renewer = cmd.hasOption(RENEWER) ?
|
||||
cmd.getOptionValue(RENEWER) : null;
|
||||
final String renewer = cmd.hasOption(RENEWER) ? cmd.getOptionValue
|
||||
(RENEWER) : null;
|
||||
final boolean cancel = cmd.hasOption(CANCEL);
|
||||
final boolean renew = cmd.hasOption(RENEW);
|
||||
final boolean print = cmd.hasOption(PRINT);
|
||||
@ -133,8 +97,9 @@ public static void main(final String[] args) throws Exception {
|
||||
printUsage(System.out);
|
||||
System.exit(0);
|
||||
}
|
||||
if (cancel && renew || cancel && print || renew && print || cancel && renew
|
||||
&& print) {
|
||||
|
||||
int commandCount = (cancel ? 1 : 0) + (renew ? 1 : 0) + (print ? 1 : 0);
|
||||
if (commandCount > 1) {
|
||||
System.err.println("ERROR: Only specify cancel, renew or print.");
|
||||
printUsage(System.err);
|
||||
}
|
||||
@ -145,26 +110,45 @@ public static void main(final String[] args) throws Exception {
|
||||
// default to using the local file system
|
||||
FileSystem local = FileSystem.getLocal(conf);
|
||||
final Path tokenFile = new Path(local.getWorkingDirectory(), remaining[0]);
|
||||
final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
|
||||
|
||||
// Login the current user
|
||||
UserGroupInformation.getCurrentUser().doAs(
|
||||
new PrivilegedExceptionAction<Object>() {
|
||||
UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
|
||||
if (print) {
|
||||
DelegationTokenIdentifier id = new DelegationTokenSecretManager(
|
||||
0, 0, 0, 0, null).createIdentifier();
|
||||
for (Token<?> token : readTokens(tokenFile, conf)) {
|
||||
DataInputStream in = new DataInputStream(
|
||||
new ByteArrayInputStream(token.getIdentifier()));
|
||||
id.readFields(in);
|
||||
System.out.println("Token (" + id + ") for " +
|
||||
token.getService());
|
||||
}
|
||||
printTokens(conf, tokenFile);
|
||||
} else if (cancel) {
|
||||
for(Token<?> token: readTokens(tokenFile, conf)) {
|
||||
cancelTokens(conf, tokenFile);
|
||||
} else if (renew) {
|
||||
renewTokens(conf, tokenFile);
|
||||
} else {
|
||||
// otherwise we are fetching
|
||||
FileSystem fs = getFileSystem(conf, webUrl);
|
||||
saveDelegationToken(conf, fs, renewer, tokenFile);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static FileSystem getFileSystem(Configuration conf, String url)
|
||||
throws IOException {
|
||||
if (url == null) {
|
||||
return FileSystem.get(conf);
|
||||
}
|
||||
|
||||
// For backward compatibility
|
||||
URI fsUri = URI.create(
|
||||
url.replaceFirst("^http://", WebHdfsFileSystem.SCHEME + "://")
|
||||
.replaceFirst("^https://", SWebHdfsFileSystem.SCHEME + "://"));
|
||||
|
||||
return FileSystem.get(fsUri, conf);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static void cancelTokens(final Configuration conf, final Path tokenFile)
|
||||
throws IOException, InterruptedException {
|
||||
for (Token<?> token : readTokens(tokenFile, conf)) {
|
||||
if (token.isManaged()) {
|
||||
token.cancel(conf);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -172,221 +156,72 @@ public Object run() throws Exception {
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (renew) {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static void renewTokens(final Configuration conf, final Path tokenFile)
|
||||
throws IOException, InterruptedException {
|
||||
for (Token<?> token : readTokens(tokenFile, conf)) {
|
||||
if (token.isManaged()) {
|
||||
long result = token.renew(conf);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Renewed token for " + token.getService()
|
||||
+ " until: " + new Date(result));
|
||||
LOG.debug("Renewed token for " + token.getService() + " until: " +
|
||||
new Date(result));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// otherwise we are fetching
|
||||
if (webUrl != null) {
|
||||
Credentials creds = getDTfromRemote(connectionFactory, new URI(
|
||||
webUrl), renewer, null);
|
||||
creds.writeTokenStorageFile(tokenFile, conf);
|
||||
for (Token<?> token : creds.getAllTokens()) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Fetched token via " + webUrl + " for "
|
||||
+ token.getService() + " into " + tokenFile);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
|
||||
@VisibleForTesting
|
||||
static void saveDelegationToken(Configuration conf, FileSystem fs,
|
||||
final String renewer, final Path tokenFile)
|
||||
throws IOException {
|
||||
Token<?> token = fs.getDelegationToken(renewer);
|
||||
|
||||
Credentials cred = new Credentials();
|
||||
Token<?> tokens[] = fs.addDelegationTokens(renewer, cred);
|
||||
cred.addToken(token.getKind(), token);
|
||||
cred.writeTokenStorageFile(tokenFile, conf);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
for (Token<?> token : tokens) {
|
||||
LOG.debug("Fetched token for " + token.getService()
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Fetched token " + fs.getUri() + " for " + token.getService()
|
||||
+ " into " + tokenFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static public Credentials getDTfromRemote(URLConnectionFactory factory,
|
||||
URI nnUri, String renewer, String proxyUser) throws IOException {
|
||||
StringBuilder buf = new StringBuilder(nnUri.toString())
|
||||
.append(GetDelegationTokenServlet.PATH_SPEC);
|
||||
String separator = "?";
|
||||
if (renewer != null) {
|
||||
buf.append("?").append(GetDelegationTokenServlet.RENEWER).append("=")
|
||||
.append(renewer);
|
||||
separator = "&";
|
||||
}
|
||||
if (proxyUser != null) {
|
||||
buf.append(separator).append("doas=").append(proxyUser);
|
||||
}
|
||||
|
||||
boolean isHttps = nnUri.getScheme().equals("https");
|
||||
|
||||
HttpURLConnection conn = null;
|
||||
DataInputStream dis = null;
|
||||
InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnUri
|
||||
.getAuthority());
|
||||
|
||||
try {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Retrieving token from: " + buf);
|
||||
}
|
||||
|
||||
conn = run(factory, new URL(buf.toString()));
|
||||
InputStream in = conn.getInputStream();
|
||||
Credentials ts = new Credentials();
|
||||
dis = new DataInputStream(in);
|
||||
ts.readFields(dis);
|
||||
for (Token<?> token : ts.getAllTokens()) {
|
||||
token.setKind(isHttps ? HsftpFileSystem.TOKEN_KIND : HftpFileSystem.TOKEN_KIND);
|
||||
SecurityUtil.setTokenService(token, serviceAddr);
|
||||
}
|
||||
return ts;
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Unable to obtain remote token", e);
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, dis);
|
||||
if (conn != null) {
|
||||
conn.disconnect();
|
||||
}
|
||||
private static void printTokens(final Configuration conf,
|
||||
final Path tokenFile)
|
||||
throws IOException {
|
||||
DelegationTokenIdentifier id = new DelegationTokenSecretManager(0, 0, 0,
|
||||
0, null).createIdentifier();
|
||||
for (Token<?> token : readTokens(tokenFile, conf)) {
|
||||
DataInputStream in = new DataInputStream(new ByteArrayInputStream(token
|
||||
.getIdentifier()));
|
||||
id.readFields(in);
|
||||
System.out.println("Token (" + id + ") for " + token.getService());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel a Delegation Token.
|
||||
* @param nnAddr the NameNode's address
|
||||
* @param tok the token to cancel
|
||||
* @throws IOException
|
||||
* @throws AuthenticationException
|
||||
*/
|
||||
static public void cancelDelegationToken(URLConnectionFactory factory,
|
||||
URI nnAddr, Token<DelegationTokenIdentifier> tok) throws IOException,
|
||||
AuthenticationException {
|
||||
StringBuilder buf = new StringBuilder(nnAddr.toString())
|
||||
.append(CancelDelegationTokenServlet.PATH_SPEC).append("?")
|
||||
.append(CancelDelegationTokenServlet.TOKEN).append("=")
|
||||
.append(tok.encodeToUrlString());
|
||||
HttpURLConnection conn = run(factory, new URL(buf.toString()));
|
||||
conn.disconnect();
|
||||
private static void printUsage(PrintStream err) {
|
||||
err.println("fetchdt retrieves delegation tokens from the NameNode");
|
||||
err.println();
|
||||
err.println("fetchdt <opts> <token file>");
|
||||
err.println("Options:");
|
||||
err.println(" --webservice <url> Url to contact NN on (starts with " +
|
||||
"http:// or https://)");
|
||||
err.println(" --renewer <name> Name of the delegation token renewer");
|
||||
err.println(" --cancel Cancel the delegation token");
|
||||
err.println(" --renew Renew the delegation token. " +
|
||||
"Delegation " + "token must have been fetched using the --renewer" +
|
||||
" <name> option.");
|
||||
err.println(" --print Print the delegation token");
|
||||
err.println();
|
||||
GenericOptionsParser.printGenericCommandUsage(err);
|
||||
ExitUtil.terminate(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Renew a Delegation Token.
|
||||
* @param nnAddr the NameNode's address
|
||||
* @param tok the token to renew
|
||||
* @return the Date that the token will expire next.
|
||||
* @throws IOException
|
||||
* @throws AuthenticationException
|
||||
*/
|
||||
static public long renewDelegationToken(URLConnectionFactory factory,
|
||||
URI nnAddr, Token<DelegationTokenIdentifier> tok) throws IOException,
|
||||
AuthenticationException {
|
||||
StringBuilder buf = new StringBuilder(nnAddr.toString())
|
||||
.append(RenewDelegationTokenServlet.PATH_SPEC).append("?")
|
||||
.append(RenewDelegationTokenServlet.TOKEN).append("=")
|
||||
.append(tok.encodeToUrlString());
|
||||
|
||||
HttpURLConnection connection = null;
|
||||
BufferedReader in = null;
|
||||
try {
|
||||
connection = run(factory, new URL(buf.toString()));
|
||||
in = new BufferedReader(new InputStreamReader(
|
||||
connection.getInputStream(), Charsets.UTF_8));
|
||||
long result = Long.parseLong(in.readLine());
|
||||
return result;
|
||||
} catch (IOException ie) {
|
||||
LOG.info("error in renew over HTTP", ie);
|
||||
IOException e = getExceptionFromResponse(connection);
|
||||
|
||||
if (e != null) {
|
||||
LOG.info("rethrowing exception from HTTP request: "
|
||||
+ e.getLocalizedMessage());
|
||||
throw e;
|
||||
}
|
||||
throw ie;
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, in);
|
||||
if (connection != null) {
|
||||
connection.disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// parse the message and extract the name of the exception and the message
|
||||
static private IOException getExceptionFromResponse(HttpURLConnection con) {
|
||||
IOException e = null;
|
||||
String resp;
|
||||
if(con == null)
|
||||
return null;
|
||||
|
||||
try {
|
||||
resp = con.getResponseMessage();
|
||||
} catch (IOException ie) { return null; }
|
||||
if(resp == null || resp.isEmpty())
|
||||
return null;
|
||||
|
||||
String exceptionClass = "", exceptionMsg = "";
|
||||
String[] rs = resp.split(";");
|
||||
if(rs.length < 2)
|
||||
return null;
|
||||
exceptionClass = rs[0];
|
||||
exceptionMsg = rs[1];
|
||||
LOG.info("Error response from HTTP request=" + resp +
|
||||
";ec=" + exceptionClass + ";em="+exceptionMsg);
|
||||
|
||||
if(exceptionClass == null || exceptionClass.isEmpty())
|
||||
return null;
|
||||
|
||||
// recreate exception objects
|
||||
try {
|
||||
Class<? extends Exception> ec =
|
||||
Class.forName(exceptionClass).asSubclass(Exception.class);
|
||||
// we are interested in constructor with String arguments
|
||||
java.lang.reflect.Constructor<? extends Exception> constructor =
|
||||
ec.getConstructor (new Class[] {String.class});
|
||||
|
||||
// create an instance
|
||||
e = (IOException) constructor.newInstance (exceptionMsg);
|
||||
|
||||
} catch (Exception ee) {
|
||||
LOG.warn("failed to create object of this class", ee);
|
||||
}
|
||||
if(e == null)
|
||||
return null;
|
||||
|
||||
e.setStackTrace(new StackTraceElement[0]); // local stack is not relevant
|
||||
LOG.info("Exception from HTTP response=" + e.getLocalizedMessage());
|
||||
return e;
|
||||
}
|
||||
|
||||
private static HttpURLConnection run(URLConnectionFactory factory, URL url)
|
||||
throws IOException, AuthenticationException {
|
||||
HttpURLConnection conn = null;
|
||||
|
||||
try {
|
||||
conn = (HttpURLConnection) factory.openConnection(url, true);
|
||||
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
|
||||
String msg = conn.getResponseMessage();
|
||||
|
||||
throw new IOException("Error when dealing remote token: " + msg);
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
LOG.info("Error when dealing remote token:", ie);
|
||||
IOException e = getExceptionFromResponse(conn);
|
||||
|
||||
if (e != null) {
|
||||
LOG.info("rethrowing exception from HTTP request: "
|
||||
+ e.getLocalizedMessage());
|
||||
throw e;
|
||||
}
|
||||
throw ie;
|
||||
}
|
||||
return conn;
|
||||
private static Collection<Token<?>> readTokens(Path file, Configuration conf)
|
||||
throws IOException {
|
||||
Credentials creds = Credentials.readTokenStorageFile(file, conf);
|
||||
return creds.getAllTokens();
|
||||
}
|
||||
}
|
||||
|
@ -57,9 +57,7 @@ public void cancel(Token<?> token, Configuration conf) throws IOException {
|
||||
|
||||
@Override
|
||||
public boolean handleKind(Text kind) {
|
||||
return kind.equals(HftpFileSystem.TOKEN_KIND)
|
||||
|| kind.equals(HsftpFileSystem.TOKEN_KIND)
|
||||
|| kind.equals(WebHdfsFileSystem.TOKEN_KIND)
|
||||
return kind.equals(WebHdfsFileSystem.TOKEN_KIND)
|
||||
|| kind.equals(SWebHdfsFileSystem.TOKEN_KIND);
|
||||
}
|
||||
|
||||
@ -89,11 +87,7 @@ private TokenManagementDelegator getInstance(Token<?> token,
|
||||
}
|
||||
|
||||
private static String getSchemeByKind(Text kind) {
|
||||
if (kind.equals(HftpFileSystem.TOKEN_KIND)) {
|
||||
return HftpFileSystem.SCHEME;
|
||||
} else if (kind.equals(HsftpFileSystem.TOKEN_KIND)) {
|
||||
return HsftpFileSystem.SCHEME;
|
||||
} else if (kind.equals(WebHdfsFileSystem.TOKEN_KIND)) {
|
||||
if (kind.equals(WebHdfsFileSystem.TOKEN_KIND)) {
|
||||
return WebHdfsFileSystem.SCHEME;
|
||||
} else if (kind.equals(SWebHdfsFileSystem.TOKEN_KIND)) {
|
||||
return SWebHdfsFileSystem.SCHEME;
|
||||
|
@ -14,7 +14,5 @@
|
||||
# limitations under the License.
|
||||
|
||||
org.apache.hadoop.hdfs.DistributedFileSystem
|
||||
org.apache.hadoop.hdfs.web.HftpFileSystem
|
||||
org.apache.hadoop.hdfs.web.HsftpFileSystem
|
||||
org.apache.hadoop.hdfs.web.WebHdfsFileSystem
|
||||
org.apache.hadoop.hdfs.web.SWebHdfsFileSystem
|
||||
|
@ -93,7 +93,6 @@
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
@ -1890,36 +1889,6 @@ public String getHttpUri(int nnIndex) {
|
||||
.get(DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a {@link HftpFileSystem} object.
|
||||
*/
|
||||
public HftpFileSystem getHftpFileSystem(int nnIndex) throws IOException {
|
||||
String uri = "hftp://"
|
||||
+ nameNodes[nnIndex].conf
|
||||
.get(DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
try {
|
||||
return (HftpFileSystem)FileSystem.get(new URI(uri), conf);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a {@link HftpFileSystem} object as specified user.
|
||||
*/
|
||||
public HftpFileSystem getHftpFileSystemAs(final String username,
|
||||
final Configuration conf, final int nnIndex, final String... groups)
|
||||
throws IOException, InterruptedException {
|
||||
final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
|
||||
username, groups);
|
||||
return ugi.doAs(new PrivilegedExceptionAction<HftpFileSystem>() {
|
||||
@Override
|
||||
public HftpFileSystem run() throws Exception {
|
||||
return getHftpFileSystem(nnIndex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the directories where the namenode stores its image.
|
||||
*/
|
||||
|
@ -63,7 +63,6 @@
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
@ -495,8 +494,6 @@ private void checkStatistics(FileSystem fs, int readOps, int writeOps, int large
|
||||
|
||||
@Test
|
||||
public void testFileChecksum() throws Exception {
|
||||
((Log4JLogger)HftpFileSystem.LOG).getLogger().setLevel(Level.ALL);
|
||||
|
||||
final long seed = RAN.nextLong();
|
||||
System.out.println("seed=" + seed);
|
||||
RAN.setSeed(seed);
|
||||
@ -531,17 +528,6 @@ public void testFileChecksum() throws Exception {
|
||||
.contains("Path is not a file: /test/TestExistingDir"));
|
||||
}
|
||||
|
||||
//hftp
|
||||
final String hftpuri = "hftp://" + nnAddr;
|
||||
System.out.println("hftpuri=" + hftpuri);
|
||||
final FileSystem hftp = ugi.doAs(
|
||||
new PrivilegedExceptionAction<FileSystem>() {
|
||||
@Override
|
||||
public FileSystem run() throws Exception {
|
||||
return new Path(hftpuri).getFileSystem(conf);
|
||||
}
|
||||
});
|
||||
|
||||
//webhdfs
|
||||
final String webhdfsuri = WebHdfsFileSystem.SCHEME + "://" + nnAddr;
|
||||
System.out.println("webhdfsuri=" + webhdfsuri);
|
||||
@ -578,14 +564,6 @@ public FileSystem run() throws Exception {
|
||||
final FileChecksum hdfsfoocs = hdfs.getFileChecksum(foo);
|
||||
System.out.println("hdfsfoocs=" + hdfsfoocs);
|
||||
|
||||
//hftp
|
||||
final FileChecksum hftpfoocs = hftp.getFileChecksum(foo);
|
||||
System.out.println("hftpfoocs=" + hftpfoocs);
|
||||
|
||||
final Path qualified = new Path(hftpuri + dir, "foo" + n);
|
||||
final FileChecksum qfoocs = hftp.getFileChecksum(qualified);
|
||||
System.out.println("qfoocs=" + qfoocs);
|
||||
|
||||
//webhdfs
|
||||
final FileChecksum webhdfsfoocs = webhdfs.getFileChecksum(foo);
|
||||
System.out.println("webhdfsfoocs=" + webhdfsfoocs);
|
||||
@ -624,13 +602,6 @@ public FileSystem run() throws Exception {
|
||||
assertEquals(hdfsfoocs.hashCode(), barhashcode);
|
||||
assertEquals(hdfsfoocs, barcs);
|
||||
|
||||
//hftp
|
||||
assertEquals(hftpfoocs.hashCode(), barhashcode);
|
||||
assertEquals(hftpfoocs, barcs);
|
||||
|
||||
assertEquals(qfoocs.hashCode(), barhashcode);
|
||||
assertEquals(qfoocs, barcs);
|
||||
|
||||
//webhdfs
|
||||
assertEquals(webhdfsfoocs.hashCode(), barhashcode);
|
||||
assertEquals(webhdfsfoocs, barcs);
|
||||
@ -640,14 +611,6 @@ public FileSystem run() throws Exception {
|
||||
}
|
||||
|
||||
hdfs.setPermission(dir, new FsPermission((short)0));
|
||||
{ //test permission error on hftp
|
||||
try {
|
||||
hftp.getFileChecksum(qualified);
|
||||
fail();
|
||||
} catch(IOException ioe) {
|
||||
FileSystem.LOG.info("GOOD: getting an exception", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
{ //test permission error on webhdfs
|
||||
try {
|
||||
|
@ -39,7 +39,6 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.log4j.Level;
|
||||
@ -64,7 +63,6 @@ public class TestFileStatus {
|
||||
private static MiniDFSCluster cluster;
|
||||
private static FileSystem fs;
|
||||
private static FileContext fc;
|
||||
private static HftpFileSystem hftpfs;
|
||||
private static DFSClient dfsClient;
|
||||
private static Path file1;
|
||||
|
||||
@ -75,7 +73,6 @@ public static void testSetUp() throws Exception {
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
fs = cluster.getFileSystem();
|
||||
fc = FileContext.getFileContext(cluster.getURI(0), conf);
|
||||
hftpfs = cluster.getHftpFileSystem(0);
|
||||
dfsClient = new DFSClient(NameNode.getAddress(conf), conf);
|
||||
file1 = new Path("filestatus.dat");
|
||||
DFSTestUtil.createFile(fs, file1, fileSize, fileSize, blockSize, (short) 1,
|
||||
@ -208,8 +205,6 @@ public void testGetFileStatusOnDir() throws Exception {
|
||||
assertEquals(dir + " should be empty", 0, stats.length);
|
||||
assertEquals(dir + " should be zero size ",
|
||||
0, fs.getContentSummary(dir).getLength());
|
||||
assertEquals(dir + " should be zero size using hftp",
|
||||
0, hftpfs.getContentSummary(dir).getLength());
|
||||
|
||||
RemoteIterator<FileStatus> itor = fc.listStatus(dir);
|
||||
assertFalse(dir + " should be empty", itor.hasNext());
|
||||
@ -239,8 +234,6 @@ public void testGetFileStatusOnDir() throws Exception {
|
||||
final int expected = blockSize/2;
|
||||
assertEquals(dir + " size should be " + expected,
|
||||
expected, fs.getContentSummary(dir).getLength());
|
||||
assertEquals(dir + " size should be " + expected + " using hftp",
|
||||
expected, hftpfs.getContentSummary(dir).getLength());
|
||||
|
||||
// Test listStatus on a non-empty directory
|
||||
stats = fs.listStatus(dir);
|
||||
@ -290,19 +283,9 @@ public void testGetFileStatusOnDir() throws Exception {
|
||||
assertEquals(dir5.toString(), itor.next().getPath().toString());
|
||||
assertEquals(file2.toString(), itor.next().getPath().toString());
|
||||
assertEquals(file3.toString(), itor.next().getPath().toString());
|
||||
|
||||
assertFalse(itor.hasNext());
|
||||
|
||||
{ //test permission error on hftp
|
||||
fs.setPermission(dir, new FsPermission((short)0));
|
||||
try {
|
||||
final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
|
||||
final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, 0, "somegroup");
|
||||
hftp2.getContentSummary(dir);
|
||||
fail();
|
||||
} catch(IOException ioe) {
|
||||
FileSystem.LOG.info("GOOD: getting an exception", ioe);
|
||||
}
|
||||
}
|
||||
fs.delete(dir, true);
|
||||
}
|
||||
}
|
||||
|
@ -42,7 +42,6 @@
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
@ -221,30 +220,6 @@ public void testAuditWebHdfsStat() throws Exception {
|
||||
assertTrue("failed to stat file", st != null && st.isFile());
|
||||
}
|
||||
|
||||
/** test that access via Hftp puts proper entry in audit log */
|
||||
@Test
|
||||
public void testAuditHftp() throws Exception {
|
||||
final Path file = new Path(fnames[0]);
|
||||
|
||||
final String hftpUri =
|
||||
"hftp://" + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
|
||||
HftpFileSystem hftpFs = null;
|
||||
|
||||
setupAuditLogs();
|
||||
try {
|
||||
hftpFs = (HftpFileSystem) new Path(hftpUri).getFileSystem(conf);
|
||||
InputStream istream = hftpFs.open(file);
|
||||
@SuppressWarnings("unused")
|
||||
int val = istream.read();
|
||||
istream.close();
|
||||
|
||||
verifyAuditLogs(true);
|
||||
} finally {
|
||||
if (hftpFs != null) hftpFs.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** test that denied access via webhdfs puts proper entry in audit log */
|
||||
@Test
|
||||
public void testAuditWebHdfsDenied() throws Exception {
|
||||
|
@ -82,16 +82,6 @@ public static void tearDown() throws Exception {
|
||||
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHsftpFileSystem() throws Exception {
|
||||
FileSystem fs = FileSystem.get(new URI("hsftp://" + nnAddr), conf);
|
||||
Assert.assertTrue(fs.exists(new Path("/test")));
|
||||
InputStream is = fs.open(new Path("/test"));
|
||||
Assert.assertEquals(23, is.read());
|
||||
is.close();
|
||||
fs.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSWebHdfsFileSystem() throws Exception {
|
||||
FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, "swebhdfs");
|
||||
|
@ -27,7 +27,7 @@
|
||||
public class FakeRenewer extends TokenRenewer {
|
||||
static Token<?> lastRenewed = null;
|
||||
static Token<?> lastCanceled = null;
|
||||
static final Text KIND = new Text("TESTING-TOKEN-KIND");
|
||||
public static final Text KIND = new Text("TESTING-TOKEN-KIND");
|
||||
|
||||
@Override
|
||||
public boolean handleKind(Text kind) {
|
||||
@ -54,4 +54,12 @@ public static void reset() {
|
||||
lastRenewed = null;
|
||||
lastCanceled = null;
|
||||
}
|
||||
|
||||
public static Token<?> getLastRenewed() {
|
||||
return lastRenewed;
|
||||
}
|
||||
|
||||
public static Token<?> getLastCanceled() {
|
||||
return lastCanceled;
|
||||
}
|
||||
}
|
||||
|
@ -182,7 +182,6 @@
|
||||
<property><!--Loaded from job.xml--><name>file.stream-buffer-size</name><value>4096</value></property>
|
||||
<property><!--Loaded from job.xml--><name>dfs.namenode.fs-limits.max-directory-items</name><value>0</value></property>
|
||||
<property><!--Loaded from job.xml--><name>io.mapfile.bloom.size</name><value>1048576</value></property>
|
||||
<property><!--Loaded from job.xml--><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
|
||||
<property><!--Loaded from job.xml--><name>yarn.nodemanager.container-executor.class</name><value>org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor</value></property>
|
||||
<property><!--Loaded from job.xml--><name>mapreduce.map.maxattempts</name><value>4</value></property>
|
||||
<property><!--Loaded from job.xml--><name>mapreduce.jobtracker.jobhistory.block.size</name><value>3145728</value></property>
|
||||
@ -224,7 +223,6 @@
|
||||
<property><!--Loaded from job.xml--><name>hadoop.http.authentication.simple.anonymous.allowed</name><value>true</value></property>
|
||||
<property><!--Loaded from job.xml--><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
|
||||
<property><!--Loaded from job.xml--><name>mapreduce.job.submithostname</name><value>localhost</value></property>
|
||||
<property><!--Loaded from job.xml--><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
|
||||
<property><!--Loaded from job.xml--><name>dfs.namenode.handler.count</name><value>10</value></property>
|
||||
<property><!--Loaded from job.xml--><name>fs.automatic.close</name><value>true</value></property>
|
||||
<property><!--Loaded from job.xml--><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
|
||||
@ -392,6 +390,7 @@
|
||||
<property><!--Loaded from job.xml--><name>mapreduce.job.counters.limit</name><value>120</value></property>
|
||||
<property><!--Loaded from job.xml--><name>dfs.datanode.ipc.address</name><value>0.0.0.0:50020</value></property>
|
||||
<property><!--Loaded from job.xml--><name>fs.webhdfs.impl</name><value>org.apache.hadoop.hdfs.web.WebHdfsFileSystem</value></property>
|
||||
<property><!--Loaded from job.xml--><name>fs.swebhdfs.impl</name><value>org.apache.hadoop.hdfs.web.SWebHdfsFileSystem</value></property>
|
||||
<property><!--Loaded from job.xml--><name>yarn.nodemanager.delete.debug-delay-sec</name><value>0</value></property>
|
||||
<property><!--Loaded from job.xml--><name>dfs.datanode.max.transfer.threads</name><value>4096</value></property>
|
||||
</configuration>
|
||||
|
@ -565,18 +565,6 @@ public void testFsClose() throws Exception {
|
||||
new Path("file:///").getFileSystem(conf);
|
||||
FileSystem.closeAll();
|
||||
}
|
||||
|
||||
{
|
||||
Configuration conf = new Configuration();
|
||||
new Path("hftp://localhost:12345/").getFileSystem(conf);
|
||||
FileSystem.closeAll();
|
||||
}
|
||||
|
||||
{
|
||||
Configuration conf = new Configuration();
|
||||
FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
|
||||
FileSystem.closeAll();
|
||||
}
|
||||
}
|
||||
|
||||
public void testFsShutdownHook() throws Exception {
|
||||
@ -617,12 +605,12 @@ public void testCacheKeysAreCaseInsensitive()
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
// check basic equality
|
||||
FileSystem.Cache.Key lowercaseCachekey1 = new FileSystem.Cache.Key(new URI("hftp://localhost:12345/"), conf);
|
||||
FileSystem.Cache.Key lowercaseCachekey2 = new FileSystem.Cache.Key(new URI("hftp://localhost:12345/"), conf);
|
||||
FileSystem.Cache.Key lowercaseCachekey1 = new FileSystem.Cache.Key(new URI("hdfs://localhost:12345/"), conf);
|
||||
FileSystem.Cache.Key lowercaseCachekey2 = new FileSystem.Cache.Key(new URI("hdfs://localhost:12345/"), conf);
|
||||
assertEquals( lowercaseCachekey1, lowercaseCachekey2 );
|
||||
|
||||
// check insensitive equality
|
||||
FileSystem.Cache.Key uppercaseCachekey = new FileSystem.Cache.Key(new URI("HFTP://Localhost:12345/"), conf);
|
||||
FileSystem.Cache.Key uppercaseCachekey = new FileSystem.Cache.Key(new URI("HDFS://Localhost:12345/"), conf);
|
||||
assertEquals( lowercaseCachekey2, uppercaseCachekey );
|
||||
|
||||
// check behaviour with collections
|
||||
|
@ -313,7 +313,7 @@ public void testMakeDirFailure() {
|
||||
= stubContext.getContext();
|
||||
|
||||
Configuration configuration = context.getConfiguration();
|
||||
String workPath = new Path("hftp://localhost:1234/*/*/*/?/")
|
||||
String workPath = new Path("webhdfs://localhost:1234/*/*/*/?/")
|
||||
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
|
||||
configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
|
||||
workPath);
|
||||
|
@ -891,64 +891,6 @@ static Path createHomeDirectory(FileSystem fs, UserGroupInformation ugi
|
||||
return home;
|
||||
}
|
||||
|
||||
public void testHftpAccessControl() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
final UserGroupInformation DFS_UGI = createUGI("dfs", true);
|
||||
final UserGroupInformation USER_UGI = createUGI("user", false);
|
||||
|
||||
//start cluster by DFS_UGI
|
||||
final Configuration dfsConf = new Configuration();
|
||||
cluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(2).build();
|
||||
cluster.waitActive();
|
||||
|
||||
final String httpAdd = dfsConf.get("dfs.http.address");
|
||||
final URI nnURI = FileSystem.getDefaultUri(dfsConf);
|
||||
final String nnUri = nnURI.toString();
|
||||
FileSystem fs1 = DFS_UGI.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||
public FileSystem run() throws IOException {
|
||||
return FileSystem.get(nnURI, dfsConf);
|
||||
}
|
||||
});
|
||||
final Path home =
|
||||
createHomeDirectory(fs1, USER_UGI);
|
||||
|
||||
//now, login as USER_UGI
|
||||
final Configuration userConf = new Configuration();
|
||||
final FileSystem fs =
|
||||
USER_UGI.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||
public FileSystem run() throws IOException {
|
||||
return FileSystem.get(nnURI, userConf);
|
||||
}
|
||||
});
|
||||
|
||||
final Path srcrootpath = new Path(home, "src_root");
|
||||
final String srcrootdir = srcrootpath.toString();
|
||||
final Path dstrootpath = new Path(home, "dst_root");
|
||||
final String dstrootdir = dstrootpath.toString();
|
||||
final DistCpV1 distcp = USER_UGI.doAs(new PrivilegedExceptionAction<DistCpV1>() {
|
||||
public DistCpV1 run() {
|
||||
return new DistCpV1(userConf);
|
||||
}
|
||||
});
|
||||
|
||||
FileSystem.mkdirs(fs, srcrootpath, new FsPermission((short)0700));
|
||||
final String[] args = {"hftp://"+httpAdd+srcrootdir, nnUri+dstrootdir};
|
||||
|
||||
{ //copy with permission 000, should fail
|
||||
fs.setPermission(srcrootpath, new FsPermission((short)0));
|
||||
USER_UGI.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
public Void run() throws Exception {
|
||||
assertEquals(-3, ToolRunner.run(distcp, args));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) { cluster.shutdown(); }
|
||||
}
|
||||
}
|
||||
|
||||
/** test -delete */
|
||||
public void testDelete() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
|
Loading…
Reference in New Issue
Block a user