HDFS-5436. Move HsFtpFileSystem and HFtpFileSystem into org.apache.hdfs.web. (Contributed by Haohui Mai)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1536921 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7dd201c541
commit
68a79b0d3f
@ -447,6 +447,9 @@ Release 2.3.0 - UNRELEASED
|
||||
HDFS-5363. Refactor WebHdfsFileSystem: move SPENGO-authenticated connection
|
||||
creation to URLConnectionFactory. (Haohui Mai via jing9)
|
||||
|
||||
HDFS-5436. Move HsFtpFileSystem and HFtpFileSystem into org.apache.hdfs.web
|
||||
(Haohui Mai via Arpit Agarwal)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
|
||||
|
@ -38,7 +38,7 @@
|
||||
import org.apache.hadoop.util.ServletUtil;
|
||||
|
||||
/** Redirect queries about the hosted filesystem to an appropriate datanode.
|
||||
* @see org.apache.hadoop.hdfs.HftpFileSystem
|
||||
* @see org.apache.hadoop.hdfs.web.HftpFileSystem
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class FileDataServlet extends DfsServlet {
|
||||
|
@ -20,14 +20,13 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.util.ServletUtil;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
|
||||
import org.znerd.xmlenc.*;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -39,13 +38,14 @@
|
||||
import java.util.Map;
|
||||
import java.util.Stack;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
/**
|
||||
* Obtain meta-information about a filesystem.
|
||||
* @see org.apache.hadoop.hdfs.HftpFileSystem
|
||||
* @see org.apache.hadoop.hdfs.web.HftpFileSystem
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ListPathsServlet extends DfsServlet {
|
||||
|
@ -41,12 +41,12 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
|
||||
import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
|
||||
import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
|
@ -16,7 +16,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs;
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
@ -34,28 +34,28 @@
|
||||
|
||||
/**
|
||||
* To support HTTP byte streams, a new connection to an HTTP server needs to be
|
||||
* created each time. This class hides the complexity of those multiple
|
||||
* created each time. This class hides the complexity of those multiple
|
||||
* connections from the client. Whenever seek() is called, a new connection
|
||||
* is made on the successive read(). The normal input stream functions are
|
||||
* connected to the currently active input stream.
|
||||
* is made on the successive read(). The normal input stream functions are
|
||||
* connected to the currently active input stream.
|
||||
*/
|
||||
public abstract class ByteRangeInputStream extends FSInputStream {
|
||||
|
||||
|
||||
/**
|
||||
* This class wraps a URL and provides method to open connection.
|
||||
* It can be overridden to change how a connection is opened.
|
||||
*/
|
||||
public static abstract class URLOpener {
|
||||
protected URL url;
|
||||
|
||||
|
||||
public URLOpener(URL u) {
|
||||
url = u;
|
||||
}
|
||||
|
||||
|
||||
public void setURL(URL u) {
|
||||
url = u;
|
||||
}
|
||||
|
||||
|
||||
public URL getURL() {
|
||||
return url;
|
||||
}
|
||||
@ -78,7 +78,7 @@ enum StreamStatus {
|
||||
StreamStatus status = StreamStatus.SEEK;
|
||||
|
||||
/**
|
||||
* Create with the specified URLOpeners. Original url is used to open the
|
||||
* Create with the specified URLOpeners. Original url is used to open the
|
||||
* stream for the first time. Resolved url is used in subsequent requests.
|
||||
* @param o Original url
|
||||
* @param r Resolved url
|
||||
@ -87,7 +87,7 @@ public ByteRangeInputStream(URLOpener o, URLOpener r) {
|
||||
this.originalURL = o;
|
||||
this.resolvedURL = r;
|
||||
}
|
||||
|
||||
|
||||
protected abstract URL getResolvedUrl(final HttpURLConnection connection
|
||||
) throws IOException;
|
||||
|
||||
@ -108,12 +108,12 @@ protected InputStream getInputStream() throws IOException {
|
||||
}
|
||||
return in;
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
protected InputStream openInputStream() throws IOException {
|
||||
// Use the original url if no resolved url exists, eg. if
|
||||
// it's the first time a request is made.
|
||||
final boolean resolved = resolvedURL.getURL() != null;
|
||||
final boolean resolved = resolvedURL.getURL() != null;
|
||||
final URLOpener opener = resolved? resolvedURL: originalURL;
|
||||
|
||||
final HttpURLConnection connection = opener.connect(startPos, resolved);
|
||||
@ -141,7 +141,7 @@ protected InputStream openInputStream() throws IOException {
|
||||
|
||||
return in;
|
||||
}
|
||||
|
||||
|
||||
private static boolean isChunkedTransferEncoding(
|
||||
final Map<String, List<String>> headers) {
|
||||
return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
|
||||
@ -186,7 +186,7 @@ public int read() throws IOException {
|
||||
public int read(byte b[], int off, int len) throws IOException {
|
||||
return update(getInputStream().read(b, off, len));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Seek to the given offset from the start of the file.
|
||||
* The next read() will be from that location. Can't
|
||||
@ -219,7 +219,7 @@ public long getPos() throws IOException {
|
||||
public boolean seekToNewSource(long targetPos) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (in != null) {
|
@ -16,7 +16,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs;
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
@ -47,11 +47,13 @@
|
||||
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.hdfs.web.ByteRangeInputStream.URLOpener;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
@ -161,7 +163,7 @@ protected URI getNamenodeUri(URI uri) {
|
||||
public String getCanonicalServiceName() {
|
||||
return SecurityUtil.buildTokenService(nnUri).toString();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected URI canonicalizeUri(URI uri) {
|
||||
return NetUtils.getCanonicalUri(uri, getDefaultPort());
|
||||
@ -183,7 +185,7 @@ public void initialize(final URI name, final Configuration conf)
|
||||
throws IOException {
|
||||
super.initialize(name, conf);
|
||||
setConf(conf);
|
||||
this.ugi = UserGroupInformation.getCurrentUser();
|
||||
this.ugi = UserGroupInformation.getCurrentUser();
|
||||
this.nnUri = getNamenodeUri(name);
|
||||
try {
|
||||
this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
|
||||
@ -224,7 +226,7 @@ protected Token<DelegationTokenIdentifier> selectDelegationToken(
|
||||
UserGroupInformation ugi) {
|
||||
return hftpTokenSelector.selectToken(nnUri, ugi.getTokens(), getConf());
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public Token<?> getRenewToken() {
|
||||
@ -315,7 +317,7 @@ protected URL getNamenodeURL(String path, String query) throws IOException {
|
||||
|
||||
/**
|
||||
* Get encoded UGI parameter string for a URL.
|
||||
*
|
||||
*
|
||||
* @return user_shortname,group1,group2...
|
||||
*/
|
||||
private String getEncodedUgiParameter() {
|
||||
@ -359,7 +361,7 @@ protected String addDelegationTokenParam(String query) throws IOException {
|
||||
|
||||
static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener {
|
||||
URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
|
||||
|
||||
|
||||
RangeHeaderUrlOpener(final URL url) {
|
||||
super(url);
|
||||
}
|
||||
@ -379,7 +381,7 @@ protected HttpURLConnection connect(final long offset,
|
||||
}
|
||||
conn.connect();
|
||||
|
||||
//Expects HTTP_OK or HTTP_PARTIAL response codes.
|
||||
//Expects HTTP_OK or HTTP_PARTIAL response codes.
|
||||
final int code = conn.getResponseCode();
|
||||
if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) {
|
||||
throw new IOException("HTTP_PARTIAL expected, received " + code);
|
||||
@ -387,7 +389,7 @@ protected HttpURLConnection connect(final long offset,
|
||||
throw new IOException("HTTP_OK expected, received " + code);
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class RangeHeaderInputStream extends ByteRangeInputStream {
|
||||
@ -410,7 +412,7 @@ public FSDataInputStream open(Path f, int buffersize) throws IOException {
|
||||
f = f.makeQualified(getUri(), getWorkingDirectory());
|
||||
String path = "/data" + ServletUtil.encodePath(f.toUri().getPath());
|
||||
String query = addDelegationTokenParam("ugi=" + getEncodedUgiParameter());
|
||||
URL u = getNamenodeURL(path, query);
|
||||
URL u = getNamenodeURL(path, query);
|
||||
return new FSDataInputStream(new RangeHeaderInputStream(u));
|
||||
}
|
||||
|
||||
@ -533,7 +535,7 @@ public void startElement(String ns, String localname, String qname,
|
||||
|
||||
private FileChecksum getFileChecksum(String f) throws IOException {
|
||||
final HttpURLConnection connection = openConnection(
|
||||
"/fileChecksum" + ServletUtil.encodePath(f),
|
||||
"/fileChecksum" + ServletUtil.encodePath(f),
|
||||
"ugi=" + getEncodedUgiParameter());
|
||||
try {
|
||||
final XMLReader xr = XMLReaderFactory.createXMLReader();
|
||||
@ -585,11 +587,11 @@ public boolean rename(Path src, Path dst) throws IOException {
|
||||
throw new IOException("Not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Override
|
||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||
throw new IOException("Not supported");
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||
throw new IOException("Not supported");
|
||||
@ -615,18 +617,18 @@ public void startElement(String ns, String localname, String qname,
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to the name node and get content summary.
|
||||
* Connect to the name node and get content summary.
|
||||
* @param path The path
|
||||
* @return The content summary for the path.
|
||||
* @throws IOException
|
||||
*/
|
||||
private ContentSummary getContentSummary(String path) throws IOException {
|
||||
final HttpURLConnection connection = openConnection(
|
||||
"/contentSummary" + ServletUtil.encodePath(path),
|
||||
"/contentSummary" + ServletUtil.encodePath(path),
|
||||
"ugi=" + getEncodedUgiParameter());
|
||||
InputStream in = null;
|
||||
try {
|
||||
in = connection.getInputStream();
|
||||
in = connection.getInputStream();
|
||||
|
||||
final XMLReader xr = XMLReaderFactory.createXMLReader();
|
||||
xr.setContentHandler(this);
|
||||
@ -713,12 +715,12 @@ protected String getUnderlyingProtocol() {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public long renew(Token<?> token,
|
||||
public long renew(Token<?> token,
|
||||
Configuration conf) throws IOException {
|
||||
// update the kerberos credentials, if they are coming from a keytab
|
||||
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
||||
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
|
||||
return
|
||||
return
|
||||
DelegationTokenFetcher.renewDelegationToken
|
||||
(DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
|
||||
(Token<DelegationTokenIdentifier>) token);
|
||||
@ -726,7 +728,7 @@ public long renew(Token<?> token,
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void cancel(Token<?> token,
|
||||
public void cancel(Token<?> token,
|
||||
Configuration conf) throws IOException {
|
||||
// update the kerberos credentials, if they are coming from a keytab
|
||||
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
||||
@ -734,9 +736,9 @@ public void cancel(Token<?> token,
|
||||
DelegationTokenFetcher.cancelDelegationToken
|
||||
(DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
|
||||
(Token<DelegationTokenIdentifier>) token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class HftpDelegationTokenSelector
|
||||
extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
|
||||
private static final DelegationTokenSelector hdfsTokenSelector =
|
||||
@ -745,14 +747,14 @@ private static class HftpDelegationTokenSelector
|
||||
public HftpDelegationTokenSelector() {
|
||||
super(TOKEN_KIND);
|
||||
}
|
||||
|
||||
|
||||
Token<DelegationTokenIdentifier> selectToken(URI nnUri,
|
||||
Collection<Token<?>> tokens, Configuration conf) {
|
||||
Token<DelegationTokenIdentifier> token =
|
||||
selectToken(SecurityUtil.buildTokenService(nnUri), tokens);
|
||||
if (token == null) {
|
||||
// try to get a HDFS token
|
||||
token = hdfsTokenSelector.selectToken(nnUri, tokens, conf);
|
||||
token = hdfsTokenSelector.selectToken(nnUri, tokens, conf);
|
||||
}
|
||||
return token;
|
||||
}
|
@ -16,7 +16,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs;
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
@ -40,13 +40,15 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
/**
|
||||
* An implementation of a protocol for accessing filesystems over HTTPS. The
|
||||
* following implementation provides a limited, read-only interface to a
|
||||
* filesystem over HTTPS.
|
||||
*
|
||||
*
|
||||
* @see org.apache.hadoop.hdfs.server.namenode.ListPathsServlet
|
||||
* @see org.apache.hadoop.hdfs.server.namenode.FileDataServlet
|
||||
*/
|
||||
@ -85,7 +87,7 @@ public void initialize(URI name, Configuration conf) throws IOException {
|
||||
|
||||
/**
|
||||
* Set up SSL resources
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void setupSsl(Configuration conf) throws IOException {
|
@ -51,7 +51,6 @@
|
||||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.ByteRangeInputStream;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
|
@ -14,6 +14,6 @@
|
||||
# limitations under the License.
|
||||
|
||||
org.apache.hadoop.hdfs.DistributedFileSystem
|
||||
org.apache.hadoop.hdfs.HftpFileSystem
|
||||
org.apache.hadoop.hdfs.HsftpFileSystem
|
||||
org.apache.hadoop.hdfs.web.HftpFileSystem
|
||||
org.apache.hadoop.hdfs.web.HsftpFileSystem
|
||||
org.apache.hadoop.hdfs.web.WebHdfsFileSystem
|
||||
|
@ -13,5 +13,5 @@
|
||||
#
|
||||
org.apache.hadoop.hdfs.DFSClient$Renewer
|
||||
org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer
|
||||
org.apache.hadoop.hdfs.HftpFileSystem$TokenManager
|
||||
org.apache.hadoop.hdfs.web.HftpFileSystem$TokenManager
|
||||
org.apache.hadoop.hdfs.web.WebHdfsFileSystem$DtRenewer
|
||||
|
@ -89,6 +89,7 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
@ -56,6 +56,7 @@
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.VolumeId;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
@ -39,6 +39,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.log4j.Level;
|
||||
|
@ -29,6 +29,7 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
|
@ -39,8 +39,8 @@
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
|
@ -15,7 +15,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
@ -34,6 +34,7 @@
|
||||
import java.net.URL;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestByteRangeInputStream {
|
||||
@ -41,24 +42,24 @@ public static class MockHttpURLConnection extends HttpURLConnection {
|
||||
public MockHttpURLConnection(URL u) {
|
||||
super(u);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean usingProxy(){
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void disconnect() {
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void connect() {
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InputStream getInputStream() throws IOException {
|
||||
return new ByteArrayInputStream("asdf".getBytes());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public URL getURL() {
|
||||
@ -70,7 +71,7 @@ public URL getURL() {
|
||||
}
|
||||
return u;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getResponseCode() {
|
||||
if (responseCode != -1) {
|
||||
@ -87,13 +88,13 @@ public int getResponseCode() {
|
||||
public void setResponseCode(int resCode) {
|
||||
responseCode = resCode;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getHeaderField(String field) {
|
||||
return (field.equalsIgnoreCase(StreamFile.CONTENT_LENGTH)) ? "65535" : null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testByteRange() throws IOException {
|
||||
HftpFileSystem.RangeHeaderUrlOpener ospy = spy(
|
||||
@ -149,7 +150,7 @@ public void testByteRange() throws IOException {
|
||||
|
||||
((MockHttpURLConnection) rspy.openConnection()).setResponseCode(200);
|
||||
is.seek(500);
|
||||
|
||||
|
||||
try {
|
||||
is.read();
|
||||
fail("Exception should be thrown when 200 response is given "
|
||||
@ -171,31 +172,31 @@ public void testByteRange() throws IOException {
|
||||
"HTTP_OK expected, received 206", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testPropagatedClose() throws IOException {
|
||||
ByteRangeInputStream brs = spy(
|
||||
new HftpFileSystem.RangeHeaderInputStream(new URL("http://test/")));
|
||||
|
||||
|
||||
InputStream mockStream = mock(InputStream.class);
|
||||
doReturn(mockStream).when(brs).openInputStream();
|
||||
|
||||
int brisOpens = 0;
|
||||
int brisCloses = 0;
|
||||
int isCloses = 0;
|
||||
|
||||
|
||||
// first open, shouldn't close underlying stream
|
||||
brs.getInputStream();
|
||||
verify(brs, times(++brisOpens)).openInputStream();
|
||||
verify(brs, times(brisCloses)).close();
|
||||
verify(mockStream, times(isCloses)).close();
|
||||
|
||||
|
||||
// stream is open, shouldn't close underlying stream
|
||||
brs.getInputStream();
|
||||
verify(brs, times(brisOpens)).openInputStream();
|
||||
verify(brs, times(brisCloses)).close();
|
||||
verify(mockStream, times(isCloses)).close();
|
||||
|
||||
|
||||
// seek forces a reopen, should close underlying stream
|
||||
brs.seek(1);
|
||||
brs.getInputStream();
|
||||
@ -221,12 +222,12 @@ public void testPropagatedClose() throws IOException {
|
||||
brs.close();
|
||||
verify(brs, times(++brisCloses)).close();
|
||||
verify(mockStream, times(++isCloses)).close();
|
||||
|
||||
|
||||
// it's already closed, underlying stream should not close
|
||||
brs.close();
|
||||
verify(brs, times(++brisCloses)).close();
|
||||
verify(mockStream, times(isCloses)).close();
|
||||
|
||||
|
||||
// it's closed, don't reopen it
|
||||
boolean errored = false;
|
||||
try {
|
@ -16,10 +16,11 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs;
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.ServerSocket;
|
||||
@ -29,7 +30,10 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.HsftpFileSystem;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.SecurityUtilTestHelper;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
@ -46,11 +50,11 @@ public void testHdfsDelegationToken() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
UserGroupInformation user =
|
||||
UserGroupInformation.createUserForTesting("oom",
|
||||
UserGroupInformation user =
|
||||
UserGroupInformation.createUserForTesting("oom",
|
||||
new String[]{"memory"});
|
||||
Token<?> token = new Token<TokenIdentifier>
|
||||
(new byte[0], new byte[0],
|
||||
(new byte[0], new byte[0],
|
||||
DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
|
||||
new Text("127.0.0.1:8020"));
|
||||
user.addToken(token);
|
||||
@ -58,7 +62,7 @@ public void testHdfsDelegationToken() throws Exception {
|
||||
(null, null, new Text("other token"), new Text("127.0.0.1:8021"));
|
||||
user.addToken(token2);
|
||||
assertEquals("wrong tokens in user", 2, user.getTokens().size());
|
||||
FileSystem fs =
|
||||
FileSystem fs =
|
||||
user.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||
@Override
|
||||
public FileSystem run() throws Exception {
|
||||
@ -78,13 +82,13 @@ public void testSelectHftpDelegationToken() throws Exception {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass("fs.hftp.impl", MyHftpFileSystem.class, FileSystem.class);
|
||||
|
||||
|
||||
int httpPort = 80;
|
||||
int httpsPort = 443;
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, httpPort);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, httpsPort);
|
||||
|
||||
// test with implicit default port
|
||||
|
||||
// test with implicit default port
|
||||
URI fsUri = URI.create("hftp://localhost");
|
||||
MyHftpFileSystem fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
assertEquals(httpPort, fs.getCanonicalUri().getPort());
|
||||
@ -96,14 +100,14 @@ public void testSelectHftpDelegationToken() throws Exception {
|
||||
fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
assertEquals(httpPort, fs.getCanonicalUri().getPort());
|
||||
checkTokenSelection(fs, httpPort, conf);
|
||||
|
||||
|
||||
// test with non-default port
|
||||
// Make sure it uses the port from the hftp URI.
|
||||
fsUri = URI.create("hftp://localhost:"+(httpPort+1));
|
||||
fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
assertEquals(httpPort+1, fs.getCanonicalUri().getPort());
|
||||
checkTokenSelection(fs, httpPort + 1, conf);
|
||||
|
||||
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5);
|
||||
}
|
||||
|
||||
@ -119,27 +123,27 @@ public void testSelectHsftpDelegationToken() throws Exception {
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, httpPort);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, httpsPort);
|
||||
|
||||
// test with implicit default port
|
||||
// test with implicit default port
|
||||
URI fsUri = URI.create("hsftp://localhost");
|
||||
MyHsftpFileSystem fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
assertEquals(httpsPort, fs.getCanonicalUri().getPort());
|
||||
checkTokenSelection(fs, httpsPort, conf);
|
||||
checkTokenSelection(fs, httpsPort, conf);
|
||||
|
||||
// test with explicit default port
|
||||
fsUri = URI.create("hsftp://localhost:"+httpsPort);
|
||||
fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
assertEquals(httpsPort, fs.getCanonicalUri().getPort());
|
||||
checkTokenSelection(fs, httpsPort, conf);
|
||||
|
||||
|
||||
// test with non-default port
|
||||
fsUri = URI.create("hsftp://localhost:"+(httpsPort+1));
|
||||
fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
assertEquals(httpsPort+1, fs.getCanonicalUri().getPort());
|
||||
checkTokenSelection(fs, httpsPort+1, conf);
|
||||
|
||||
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testInsecureRemoteCluster() throws Exception {
|
||||
@ -186,7 +190,7 @@ public void run() {
|
||||
t.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void checkTokenSelection(HftpFileSystem fs,
|
||||
int port,
|
||||
Configuration conf) throws IOException {
|
||||
@ -216,12 +220,12 @@ private void checkTokenSelection(HftpFileSystem fs,
|
||||
token = fs.selectDelegationToken(ugi);
|
||||
assertNotNull(token);
|
||||
assertEquals(hftpToken, token);
|
||||
|
||||
|
||||
// switch to using host-based tokens, no token should match
|
||||
SecurityUtilTestHelper.setTokenServiceUseIp(false);
|
||||
token = fs.selectDelegationToken(ugi);
|
||||
assertNull(token);
|
||||
|
||||
|
||||
// test fallback to hdfs token
|
||||
hdfsToken = new Token<TokenIdentifier>(
|
||||
new byte[0], new byte[0],
|
||||
@ -241,7 +245,7 @@ private void checkTokenSelection(HftpFileSystem fs,
|
||||
assertNotNull(token);
|
||||
assertEquals(hftpToken, token);
|
||||
}
|
||||
|
||||
|
||||
static class MyHftpFileSystem extends HftpFileSystem {
|
||||
@Override
|
||||
public URI getCanonicalUri() {
|
||||
@ -255,7 +259,7 @@ public int getDefaultPort() {
|
||||
@Override
|
||||
protected void initDelegationToken() throws IOException {}
|
||||
}
|
||||
|
||||
|
||||
static class MyHsftpFileSystem extends HsftpFileSystem {
|
||||
@Override
|
||||
public URI getCanonicalUri() {
|
@ -16,7 +16,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs;
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
@ -38,16 +38,21 @@
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.Builder;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.HsftpFileSystem;
|
||||
import org.apache.hadoop.util.ServletUtil;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.*;
|
||||
|
||||
public class TestHftpFileSystem {
|
||||
private static final Random RAN = new Random();
|
||||
|
||||
|
||||
private static Configuration config = null;
|
||||
private static MiniDFSCluster cluster = null;
|
||||
private static String blockPoolId = null;
|
||||
@ -94,17 +99,17 @@ public static void setUp() throws IOException {
|
||||
config = new Configuration();
|
||||
cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build();
|
||||
blockPoolId = cluster.getNamesystem().getBlockPoolId();
|
||||
hftpUri =
|
||||
hftpUri =
|
||||
"hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
}
|
||||
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws IOException {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Before
|
||||
public void initFileSystems() throws IOException {
|
||||
hdfs = cluster.getFileSystem();
|
||||
@ -119,9 +124,9 @@ public void initFileSystems() throws IOException {
|
||||
public void resetFileSystems() throws IOException {
|
||||
FileSystem.closeAll();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test file creation and access with file names that need encoding.
|
||||
* Test file creation and access with file names that need encoding.
|
||||
*/
|
||||
@Test
|
||||
public void testFileNameEncoding() throws IOException, URISyntaxException {
|
||||
@ -159,13 +164,13 @@ private void testDataNodeRedirect(Path path) throws IOException {
|
||||
|
||||
// Get the path's block location so we can determine
|
||||
// if we were redirected to the right DN.
|
||||
BlockLocation[] locations =
|
||||
BlockLocation[] locations =
|
||||
hdfs.getFileBlockLocations(path, 0, 10);
|
||||
String xferAddr = locations[0].getNames()[0];
|
||||
|
||||
// Connect to the NN to get redirected
|
||||
URL u = hftpFs.getNamenodeURL(
|
||||
"/data" + ServletUtil.encodePath(path.toUri().getPath()),
|
||||
"/data" + ServletUtil.encodePath(path.toUri().getPath()),
|
||||
"ugi=userx,groupy");
|
||||
HttpURLConnection conn = (HttpURLConnection)u.openConnection();
|
||||
HttpURLConnection.setFollowRedirects(true);
|
||||
@ -176,7 +181,7 @@ private void testDataNodeRedirect(Path path) throws IOException {
|
||||
// Find the datanode that has the block according to locations
|
||||
// and check that the URL was redirected to this DN's info port
|
||||
for (DataNode node : cluster.getDataNodes()) {
|
||||
DatanodeRegistration dnR =
|
||||
DatanodeRegistration dnR =
|
||||
DataNodeTestUtils.getDNRegistrationForBP(node, blockPoolId);
|
||||
if (dnR.getXferAddr().equals(xferAddr)) {
|
||||
checked = true;
|
||||
@ -207,25 +212,25 @@ public void testGetPos() throws IOException {
|
||||
FSDataOutputStream out = hdfs.create(testFile, true);
|
||||
out.writeBytes("0123456789");
|
||||
out.close();
|
||||
|
||||
|
||||
FSDataInputStream in = hftpFs.open(testFile);
|
||||
|
||||
|
||||
// Test read().
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
assertEquals(i, in.getPos());
|
||||
in.read();
|
||||
}
|
||||
|
||||
|
||||
// Test read(b, off, len).
|
||||
assertEquals(5, in.getPos());
|
||||
byte[] buffer = new byte[10];
|
||||
assertEquals(2, in.read(buffer, 0, 2));
|
||||
assertEquals(7, in.getPos());
|
||||
|
||||
|
||||
// Test read(b).
|
||||
int bytesRead = in.read(buffer);
|
||||
assertEquals(7 + bytesRead, in.getPos());
|
||||
|
||||
|
||||
// Test EOF.
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
in.read();
|
||||
@ -261,21 +266,21 @@ public void testReadClosedStream() throws IOException {
|
||||
in.close();
|
||||
checkClosedStream(in);
|
||||
checkClosedStream(in.getWrappedStream());
|
||||
|
||||
|
||||
// force the stream to connect and then close it
|
||||
in = hftpFs.open(testFile);
|
||||
int ch = in.read();
|
||||
int ch = in.read();
|
||||
assertEquals('0', ch);
|
||||
in.close();
|
||||
checkClosedStream(in);
|
||||
checkClosedStream(in.getWrappedStream());
|
||||
|
||||
|
||||
// make sure seeking doesn't automagically reopen the stream
|
||||
in.seek(4);
|
||||
checkClosedStream(in);
|
||||
checkClosedStream(in.getWrappedStream());
|
||||
}
|
||||
|
||||
|
||||
private void checkClosedStream(InputStream is) {
|
||||
IOException ioe = null;
|
||||
try {
|
||||
@ -286,7 +291,7 @@ private void checkClosedStream(InputStream is) {
|
||||
assertNotNull("No exception on closed read", ioe);
|
||||
assertEquals("Stream closed", ioe.getMessage());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testHftpDefaultPorts() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
@ -304,7 +309,7 @@ public void testHftpDefaultPorts() throws IOException {
|
||||
fs.getCanonicalServiceName()
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testHftpCustomDefaultPorts() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
@ -314,7 +319,7 @@ public void testHftpCustomDefaultPorts() throws IOException {
|
||||
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
|
||||
|
||||
assertEquals(123, fs.getDefaultPort());
|
||||
|
||||
|
||||
assertEquals(uri, fs.getUri());
|
||||
|
||||
// HFTP uses http to get the token so canonical service name should
|
||||
@ -349,8 +354,8 @@ public void testHftpCustomUriPortWithCustomDefaultPorts() throws IOException {
|
||||
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
|
||||
|
||||
assertEquals(123, fs.getDefaultPort());
|
||||
|
||||
assertEquals(uri, fs.getUri());
|
||||
|
||||
assertEquals(uri, fs.getUri());
|
||||
assertEquals(
|
||||
"127.0.0.1:789",
|
||||
fs.getCanonicalServiceName()
|
||||
@ -384,7 +389,7 @@ public void testHsftpCustomDefaultPorts() throws IOException {
|
||||
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
|
||||
|
||||
assertEquals(456, fs.getDefaultPort());
|
||||
|
||||
|
||||
assertEquals(uri, fs.getUri());
|
||||
assertEquals(
|
||||
"127.0.0.1:456",
|
@ -16,7 +16,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs;
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
@ -33,6 +33,8 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.HsftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -83,7 +85,7 @@ public void testHsftpSocketTimeout() throws Exception {
|
||||
|
||||
HsftpFileSystem fs = (HsftpFileSystem)FileSystem.get(uri, conf);
|
||||
fs.connectionFactory = new URLConnectionFactory(5);
|
||||
|
||||
|
||||
try {
|
||||
HttpURLConnection conn = null;
|
||||
timedout = false;
|
||||
@ -104,7 +106,7 @@ public void testHsftpSocketTimeout() throws Exception {
|
||||
fs.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private boolean checkConnectTimeout(HftpFileSystem fs, boolean ignoreReadTimeout)
|
||||
throws IOException {
|
||||
boolean timedout = false;
|
@ -34,9 +34,9 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
@ -46,7 +46,6 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.Trash;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
|
Loading…
Reference in New Issue
Block a user