HDFS-2453. Fix http response code for partial content in webhdfs, added getDefaultBlockSize() and getDefaultReplication() in WebHdfsFileSystem and cleared content type in ExceptionHandler.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1186508 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
01d408d5b0
commit
0bd8f0bd40
@ -132,6 +132,10 @@ Trunk (unreleased changes)
|
||||
HDFS-2188. Make FSEditLog create its journals from a list of URIs rather
|
||||
than NNStorage. (Ivan Kelly via jitendra)
|
||||
|
||||
HDFS-2453. Fix http response code for partial content in webhdfs, added
|
||||
getDefaultBlockSize() and getDefaultReplication() in WebHdfsFileSystem
|
||||
and cleared content type in ExceptionHandler. (szetszwo)
|
||||
|
||||
Release 0.23.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -22,10 +22,13 @@ import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
||||
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||
|
||||
/**
|
||||
* To support HTTP byte streams, a new connection to an HTTP server needs to be
|
||||
@ -42,6 +45,8 @@ public class ByteRangeInputStream extends FSInputStream {
|
||||
*/
|
||||
static class URLOpener {
|
||||
protected URL url;
|
||||
/** The url with offset parameter */
|
||||
private URL offsetUrl;
|
||||
|
||||
public URLOpener(URL u) {
|
||||
url = u;
|
||||
@ -54,12 +59,55 @@ public class ByteRangeInputStream extends FSInputStream {
|
||||
public URL getURL() {
|
||||
return url;
|
||||
}
|
||||
|
||||
public HttpURLConnection openConnection() throws IOException {
|
||||
return (HttpURLConnection)url.openConnection();
|
||||
|
||||
HttpURLConnection openConnection() throws IOException {
|
||||
return (HttpURLConnection)offsetUrl.openConnection();
|
||||
}
|
||||
|
||||
private HttpURLConnection openConnection(final long offset) throws IOException {
|
||||
offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
|
||||
final HttpURLConnection conn = openConnection();
|
||||
conn.setRequestMethod("GET");
|
||||
if (offset != 0L) {
|
||||
conn.setRequestProperty("Range", "bytes=" + offset + "-");
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
}
|
||||
|
||||
static private final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
|
||||
|
||||
/** Remove offset parameter, if there is any, from the url */
|
||||
static URL removeOffsetParam(final URL url) throws MalformedURLException {
|
||||
String query = url.getQuery();
|
||||
if (query == null) {
|
||||
return url;
|
||||
}
|
||||
final String lower = query.toLowerCase();
|
||||
if (!lower.startsWith(OFFSET_PARAM_PREFIX)
|
||||
&& !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
|
||||
return url;
|
||||
}
|
||||
|
||||
//rebuild query
|
||||
StringBuilder b = null;
|
||||
for(final StringTokenizer st = new StringTokenizer(query, "&");
|
||||
st.hasMoreTokens();) {
|
||||
final String token = st.nextToken();
|
||||
if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) {
|
||||
if (b == null) {
|
||||
b = new StringBuilder("?").append(token);
|
||||
} else {
|
||||
b.append('&').append(token);
|
||||
}
|
||||
}
|
||||
}
|
||||
query = b == null? "": b.toString();
|
||||
|
||||
final String urlStr = url.toString();
|
||||
return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
|
||||
}
|
||||
|
||||
enum StreamStatus {
|
||||
NORMAL, SEEK
|
||||
}
|
||||
@ -95,12 +143,8 @@ public class ByteRangeInputStream extends FSInputStream {
|
||||
final URLOpener opener =
|
||||
(resolvedURL.getURL() == null) ? originalURL : resolvedURL;
|
||||
|
||||
final HttpURLConnection connection = opener.openConnection();
|
||||
final HttpURLConnection connection = opener.openConnection(startPos);
|
||||
try {
|
||||
connection.setRequestMethod("GET");
|
||||
if (startPos != 0) {
|
||||
connection.setRequestProperty("Range", "bytes="+startPos+"-");
|
||||
}
|
||||
connection.connect();
|
||||
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
||||
filelength = (cl == null) ? -1 : Long.parseLong(cl);
|
||||
@ -125,7 +169,7 @@ public class ByteRangeInputStream extends FSInputStream {
|
||||
throw new IOException("HTTP_OK expected, received " + respCode);
|
||||
}
|
||||
|
||||
resolvedURL.setURL(connection.getURL());
|
||||
resolvedURL.setURL(removeOffsetParam(connection.getURL()));
|
||||
status = StreamStatus.NORMAL;
|
||||
}
|
||||
|
||||
|
@ -102,7 +102,7 @@ public class DatanodeWebHdfsMethods {
|
||||
final ReplicationParam replication,
|
||||
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
|
||||
final BlockSizeParam blockSize
|
||||
) throws IOException, URISyntaxException, InterruptedException {
|
||||
) throws IOException, InterruptedException {
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||
@ -162,7 +162,7 @@ public class DatanodeWebHdfsMethods {
|
||||
final PostOpParam op,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize
|
||||
) throws IOException, URISyntaxException, InterruptedException {
|
||||
) throws IOException, InterruptedException {
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||
@ -216,7 +216,7 @@ public class DatanodeWebHdfsMethods {
|
||||
final LengthParam length,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize
|
||||
) throws IOException, URISyntaxException, InterruptedException {
|
||||
) throws IOException, InterruptedException {
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||
@ -255,7 +255,11 @@ public class DatanodeWebHdfsMethods {
|
||||
}
|
||||
}
|
||||
};
|
||||
return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
|
||||
final int status = offset.getValue() == 0?
|
||||
HttpServletResponse.SC_OK: HttpServletResponse.SC_PARTIAL_CONTENT;
|
||||
return Response.status(status).entity(streaming).type(
|
||||
MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case GETFILECHECKSUM:
|
||||
{
|
||||
|
@ -31,6 +31,8 @@ import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
@ -44,6 +46,7 @@ import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.ByteRangeInputStream;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||
@ -86,6 +89,7 @@ import org.mortbay.util.ajax.JSON;
|
||||
|
||||
/** A FileSystem for HDFS over the web. */
|
||||
public class WebHdfsFileSystem extends HftpFileSystem {
|
||||
public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
|
||||
/** File System URI: {SCHEME}://namenode:port/path/to/file */
|
||||
public static final String SCHEME = "webhdfs";
|
||||
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
|
||||
@ -340,6 +344,18 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
||||
run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime));
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDefaultBlockSize() {
|
||||
return getConf().getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
|
||||
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getDefaultReplication() {
|
||||
return (short)getConf().getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
|
||||
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
||||
}
|
||||
|
||||
private FSDataOutputStream write(final HttpOpParam.Op op,
|
||||
final HttpURLConnection conn, final int bufferSize) throws IOException {
|
||||
return new FSDataOutputStream(new BufferedOutputStream(
|
||||
|
@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.web.resources;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.ext.ExceptionMapper;
|
||||
@ -36,12 +38,17 @@ import com.sun.jersey.api.ParamException;
|
||||
public class ExceptionHandler implements ExceptionMapper<Exception> {
|
||||
public static final Log LOG = LogFactory.getLog(ExceptionHandler.class);
|
||||
|
||||
private @Context HttpServletResponse response;
|
||||
|
||||
@Override
|
||||
public Response toResponse(Exception e) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("GOT EXCEPITION", e);
|
||||
}
|
||||
|
||||
//clear content type
|
||||
response.setContentType(null);
|
||||
|
||||
//Convert exception
|
||||
if (e instanceof ParamException) {
|
||||
final ParamException paramexception = (ParamException)e;
|
||||
|
@ -35,28 +35,29 @@ import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener;
|
||||
import org.junit.Test;
|
||||
|
||||
class MockHttpURLConnection extends HttpURLConnection {
|
||||
private int responseCode = -1;
|
||||
URL m;
|
||||
|
||||
public MockHttpURLConnection(URL u) {
|
||||
super(u);
|
||||
m = u;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean usingProxy(){
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect() {
|
||||
}
|
||||
|
||||
public void connect() throws IOException {
|
||||
@Override
|
||||
public void connect() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream getInputStream() throws IOException {
|
||||
return new ByteArrayInputStream("asdf".getBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public URL getURL() {
|
||||
URL u = null;
|
||||
try {
|
||||
@ -67,6 +68,7 @@ class MockHttpURLConnection extends HttpURLConnection {
|
||||
return u;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getResponseCode() {
|
||||
if (responseCode != -1) {
|
||||
return responseCode;
|
||||
@ -82,10 +84,45 @@ class MockHttpURLConnection extends HttpURLConnection {
|
||||
public void setResponseCode(int resCode) {
|
||||
responseCode = resCode;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public class TestByteRangeInputStream {
|
||||
@Test
|
||||
public void testRemoveOffset() throws IOException {
|
||||
{ //no offset
|
||||
String s = "http://test/Abc?Length=99";
|
||||
assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //no parameters
|
||||
String s = "http://test/Abc";
|
||||
assertEquals(s, ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //offset as first parameter
|
||||
String s = "http://test/Abc?offset=10&Length=99";
|
||||
assertEquals("http://test/Abc?Length=99",
|
||||
ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //offset as second parameter
|
||||
String s = "http://test/Abc?op=read&OFFset=10&Length=99";
|
||||
assertEquals("http://test/Abc?op=read&Length=99",
|
||||
ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //offset as last parameter
|
||||
String s = "http://test/Abc?Length=99&offset=10";
|
||||
assertEquals("http://test/Abc?Length=99",
|
||||
ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
|
||||
{ //offset as the only parameter
|
||||
String s = "http://test/Abc?offset=10";
|
||||
assertEquals("http://test/Abc",
|
||||
ByteRangeInputStream.removeOffsetParam(new URL(s)).toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteRange() throws IOException {
|
||||
|
@ -123,6 +123,8 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
||||
}
|
||||
}
|
||||
|
||||
//the following are new tests (i.e. not over-riding the super class methods)
|
||||
|
||||
public void testGetFileBlockLocations() throws IOException {
|
||||
final String f = "/test/testGetFileBlockLocations";
|
||||
createFile(path(f));
|
||||
@ -172,4 +174,45 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
||||
WebHdfsFileSystem.LOG.info("This is expected.", fnfe);
|
||||
}
|
||||
}
|
||||
|
||||
public void testSeek() throws IOException {
|
||||
final Path p = new Path("/test/testSeek");
|
||||
createFile(p);
|
||||
|
||||
final int one_third = data.length/3;
|
||||
final int two_third = one_third*2;
|
||||
|
||||
{ //test seek
|
||||
final int offset = one_third;
|
||||
final int len = data.length - offset;
|
||||
final byte[] buf = new byte[len];
|
||||
|
||||
final FSDataInputStream in = fs.open(p);
|
||||
in.seek(offset);
|
||||
|
||||
//read all remaining data
|
||||
in.readFully(buf);
|
||||
in.close();
|
||||
|
||||
for (int i = 0; i < buf.length; i++) {
|
||||
assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
|
||||
data[i + offset], buf[i]);
|
||||
}
|
||||
}
|
||||
|
||||
{ //test position read (read the data after the two_third location)
|
||||
final int offset = two_third;
|
||||
final int len = data.length - offset;
|
||||
final byte[] buf = new byte[len];
|
||||
|
||||
final FSDataInputStream in = fs.open(p);
|
||||
in.readFully(offset, buf);
|
||||
in.close();
|
||||
|
||||
for (int i = 0; i < buf.length; i++) {
|
||||
assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
|
||||
data[i + offset], buf[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user