HDFS-6305. WebHdfs response decoding may throw RuntimeExceptions (Daryn Sharp via jeagles)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1594273 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Turner Eagles 2014-05-13 16:40:15 +00:00
parent 335f61a72f
commit e4ee1d111b
4 changed files with 288 additions and 231 deletions

View File

@ -461,6 +461,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6367. EnumSetParam$Domain#parse fails for parameter containing more than one enum.
(Yi Liu via umamahesh)
HDFS-6305. WebHdfs response decoding may throw RuntimeExceptions (Daryn
Sharp via jeagles)
Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -58,34 +58,8 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.GroupParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.hdfs.web.resources.PermissionParam;
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
@ -425,41 +399,24 @@ public class WebHdfsFileSystem extends FileSystem
return url;
}
/**
* Run a http operation.
* Connect to the http server, validate response, and obtain the JSON output.
*
* @param op http operation
* @param fspath file system path
* @param parameters parameters for the operation
* @return a JSON object, e.g. Object[], Map<?, ?>, etc.
* @throws IOException
*/
private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
final Param<?,?>... parameters) throws IOException {
return new FsPathRunner(op, fspath, parameters).run().json;
}
/**
* This class is for initialing a HTTP connection, connecting to server,
* obtaining a response, and also handling retry on failures.
*/
abstract class AbstractRunner {
abstract class AbstractRunner<T> {
abstract protected URL getUrl() throws IOException;
protected final HttpOpParam.Op op;
private final boolean redirected;
private boolean checkRetry;
protected HttpURLConnection conn = null;
private Map<?, ?> json = null;
protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
this.op = op;
this.redirected = redirected;
}
AbstractRunner run() throws IOException {
T run() throws IOException {
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
connectUgi = ugi;
@ -471,9 +428,9 @@ public class WebHdfsFileSystem extends FileSystem
// the entire lifecycle of the connection must be run inside the
// doAs to ensure authentication is performed correctly
return connectUgi.doAs(
new PrivilegedExceptionAction<AbstractRunner>() {
new PrivilegedExceptionAction<T>() {
@Override
public AbstractRunner run() throws IOException {
public T run() throws IOException {
return runWithRetry();
}
});
@ -481,18 +438,51 @@ public class WebHdfsFileSystem extends FileSystem
throw new IOException(e);
}
}
private void init() throws IOException {
checkRetry = !redirected;
URL url = getUrl();
conn = (HttpURLConnection) connectionFactory.openConnection(url);
}
private void connect() throws IOException {
connect(op.getDoOutput());
/**
* Two-step requests redirected to a DN
*
* Create/Append:
* Step 1) Submit a Http request with neither auto-redirect nor data.
* Step 2) Submit another Http request with the URL from the Location header with data.
*
* The reason of having two-step create/append is for preventing clients to
* send out the data before the redirect. This issue is addressed by the
* "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
* Unfortunately, there are software library bugs (e.g. Jetty 6 http server
* and Java 6 http client), which do not correctly implement "Expect:
* 100-continue". The two-step create/append is a temporary workaround for
* the software library bugs.
*
* Open/Checksum
* Also implements two-step connects for other operations redirected to
* a DN such as open and checksum
*/
private HttpURLConnection connect(URL url) throws IOException {
// resolve redirects for a DN operation unless already resolved
if (op.getRedirect() && !redirected) {
final HttpOpParam.Op redirectOp =
HttpOpParam.TemporaryRedirectOp.valueOf(op);
final HttpURLConnection conn = connect(redirectOp, url);
// application level proxy like httpfs might not issue a redirect
if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) {
return conn;
}
try {
validateResponse(redirectOp, conn, false);
url = new URL(conn.getHeaderField("Location"));
} finally {
conn.disconnect();
}
}
return connect(op, url);
}
private void connect(boolean doOutput) throws IOException {
private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
throws IOException {
final HttpURLConnection conn =
(HttpURLConnection)connectionFactory.openConnection(url);
final boolean doOutput = op.getDoOutput();
conn.setRequestMethod(op.getType().toString());
conn.setInstanceFollowRedirects(false);
switch (op.getType()) {
@ -505,6 +495,10 @@ public class WebHdfsFileSystem extends FileSystem
// explicitly setting content-length to 0 won't do spnego!!
// opening and closing the stream will send "Content-Length: 0"
conn.getOutputStream().close();
} else {
conn.setRequestProperty("Content-Type",
MediaType.APPLICATION_OCTET_STREAM);
conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
}
break;
}
@ -514,16 +508,10 @@ public class WebHdfsFileSystem extends FileSystem
}
}
conn.connect();
return conn;
}
private void disconnect() {
if (conn != null) {
conn.disconnect();
conn = null;
}
}
private AbstractRunner runWithRetry() throws IOException {
private T runWithRetry() throws IOException {
/**
* Do the real work.
*
@ -541,15 +529,16 @@ public class WebHdfsFileSystem extends FileSystem
* examines the exception and swallows it if it decides to rerun the work.
*/
for(int retry = 0; ; retry++) {
checkRetry = !redirected;
final URL url = getUrl();
try {
init();
if (op.getDoOutput()) {
twoStepWrite();
} else {
getResponse(op != GetOpParam.Op.OPEN);
final HttpURLConnection conn = connect(url);
// output streams will validate on close
if (!op.getDoOutput()) {
validateResponse(op, conn, false);
}
return this;
} catch(IOException ioe) {
return getResponse(conn);
} catch (IOException ioe) {
Throwable cause = ioe.getCause();
if (cause != null && cause instanceof AuthenticationException) {
throw ioe; // no retries for auth failures
@ -591,87 +580,129 @@ public class WebHdfsFileSystem extends FileSystem
throw toIOException(ioe);
}
/**
* Two-step Create/Append:
* Step 1) Submit a Http request with neither auto-redirect nor data.
* Step 2) Submit another Http request with the URL from the Location header with data.
*
* The reason of having two-step create/append is for preventing clients to
* send out the data before the redirect. This issue is addressed by the
* "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
* Unfortunately, there are software library bugs (e.g. Jetty 6 http server
* and Java 6 http client), which do not correctly implement "Expect:
* 100-continue". The two-step create/append is a temporary workaround for
* the software library bugs.
*/
HttpURLConnection twoStepWrite() throws IOException {
//Step 1) Submit a Http request with neither auto-redirect nor data.
connect(false);
validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false);
final String redirect = conn.getHeaderField("Location");
disconnect();
checkRetry = false;
//Step 2) Submit another Http request with the URL from the Location header with data.
conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
redirect));
conn.setRequestProperty("Content-Type",
MediaType.APPLICATION_OCTET_STREAM);
conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
connect();
return conn;
}
FSDataOutputStream write(final int bufferSize) throws IOException {
return WebHdfsFileSystem.this.write(op, conn, bufferSize);
}
void getResponse(boolean getJsonAndDisconnect) throws IOException {
try {
connect();
final int code = conn.getResponseCode();
if (!redirected && op.getRedirect()
&& code != op.getExpectedHttpResponseCode()) {
final String redirect = conn.getHeaderField("Location");
json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op),
conn, false);
disconnect();
checkRetry = false;
conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
redirect));
connect();
}
json = validateResponse(op, conn, false);
if (json == null && getJsonAndDisconnect) {
json = jsonParse(conn, false);
}
} finally {
if (getJsonAndDisconnect) {
disconnect();
}
}
}
abstract T getResponse(HttpURLConnection conn) throws IOException;
}
final class FsPathRunner extends AbstractRunner {
/**
* Abstract base class to handle path-based operations with params
*/
abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
private final Path fspath;
private final Param<?, ?>[] parameters;
FsPathRunner(final HttpOpParam.Op op, final Path fspath, final Param<?,?>... parameters) {
private final Param<?,?>[] parameters;
AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
Param<?,?>... parameters) {
super(op, false);
this.fspath = fspath;
this.parameters = parameters;
}
@Override
protected URL getUrl() throws IOException {
return toUrl(op, fspath, parameters);
}
}
final class URLRunner extends AbstractRunner {
/**
* Default path-based implementation expects no json response
*/
class FsPathRunner extends AbstractFsPathRunner<Void> {
FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
super(op, fspath, parameters);
}
@Override
Void getResponse(HttpURLConnection conn) throws IOException {
return null;
}
}
/**
* Handle path-based operations with a json response
*/
abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> {
FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath,
Param<?,?>... parameters) {
super(op, fspath, parameters);
}
@Override
final T getResponse(HttpURLConnection conn) throws IOException {
try {
final Map<?,?> json = jsonParse(conn, false);
if (json == null) {
// match exception class thrown by parser
throw new IllegalStateException("Missing response");
}
return decodeResponse(json);
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) { // catch json parser errors
final IOException ioe =
new IOException("Response decoding failure: "+e.toString(), e);
if (LOG.isDebugEnabled()) {
LOG.debug(ioe);
}
throw ioe;
} finally {
conn.disconnect();
}
}
abstract T decodeResponse(Map<?,?> json) throws IOException;
}
/**
* Handle path-based operations with json boolean response
*/
class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> {
FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
super(op, fspath, parameters);
}
@Override
Boolean decodeResponse(Map<?,?> json) throws IOException {
return (Boolean)json.get("boolean");
}
}
/**
* Handle create/append output streams
*/
class FsPathOutputStreamRunner extends AbstractFsPathRunner<FSDataOutputStream> {
private final int bufferSize;
FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
Param<?,?>... parameters) {
super(op, fspath, parameters);
this.bufferSize = bufferSize;
}
@Override
FSDataOutputStream getResponse(final HttpURLConnection conn)
throws IOException {
return new FSDataOutputStream(new BufferedOutputStream(
conn.getOutputStream(), bufferSize), statistics) {
@Override
public void close() throws IOException {
try {
super.close();
} finally {
try {
validateResponse(op, conn, true);
} finally {
conn.disconnect();
}
}
}
};
}
}
/**
* Used by open() which tracks the resolved url itself
*/
final class URLRunner extends AbstractRunner<HttpURLConnection> {
private final URL url;
@Override
protected URL getUrl() {
@ -682,6 +713,11 @@ public class WebHdfsFileSystem extends FileSystem
super(op, redirected);
this.url = url;
}
@Override
HttpURLConnection getResponse(HttpURLConnection conn) throws IOException {
return conn;
}
}
private FsPermission applyUMask(FsPermission permission) {
@ -693,8 +729,12 @@ public class WebHdfsFileSystem extends FileSystem
private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
final Map<?, ?> json = run(op, f);
final HdfsFileStatus status = JsonUtil.toFileStatus(json, true);
HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
@Override
HdfsFileStatus decodeResponse(Map<?,?> json) {
return JsonUtil.toFileStatus(json, true);
}
}.run();
if (status == null) {
throw new FileNotFoundException("File does not exist: " + f);
}
@ -718,8 +758,12 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public AclStatus getAclStatus(Path f) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS;
final Map<?, ?> json = run(op, f);
AclStatus status = JsonUtil.toAclStatus(json);
AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
@Override
AclStatus decodeResponse(Map<?,?> json) {
return JsonUtil.toAclStatus(json);
}
}.run();
if (status == null) {
throw new FileNotFoundException("File does not exist: " + f);
}
@ -730,9 +774,9 @@ public class WebHdfsFileSystem extends FileSystem
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
final Map<?, ?> json = run(op, f,
new PermissionParam(applyUMask(permission)));
return (Boolean)json.get("boolean");
return new FsPathBooleanRunner(op, f,
new PermissionParam(applyUMask(permission))
).run();
}
/**
@ -743,17 +787,19 @@ public class WebHdfsFileSystem extends FileSystem
) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
run(op, f, new DestinationParam(makeQualified(destination).toUri().getPath()),
new CreateParentParam(createParent));
new FsPathRunner(op, f,
new DestinationParam(makeQualified(destination).toUri().getPath()),
new CreateParentParam(createParent)
).run();
}
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.RENAME;
final Map<?, ?> json = run(op, src,
new DestinationParam(makeQualified(dst).toUri().getPath()));
return (Boolean)json.get("boolean");
return new FsPathBooleanRunner(op, src,
new DestinationParam(makeQualified(dst).toUri().getPath())
).run();
}
@SuppressWarnings("deprecation")
@ -762,8 +808,10 @@ public class WebHdfsFileSystem extends FileSystem
final Options.Rename... options) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.RENAME;
run(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()),
new RenameOptionSetParam(options));
new FsPathRunner(op, src,
new DestinationParam(makeQualified(dst).toUri().getPath()),
new RenameOptionSetParam(options)
).run();
}
@Override
@ -775,7 +823,9 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
run(op, p, new OwnerParam(owner), new GroupParam(group));
new FsPathRunner(op, p,
new OwnerParam(owner), new GroupParam(group)
).run();
}
@Override
@ -783,7 +833,7 @@ public class WebHdfsFileSystem extends FileSystem
) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
run(op, p, new PermissionParam(permission));
new FsPathRunner(op, p,new PermissionParam(permission)).run();
}
@Override
@ -791,7 +841,7 @@ public class WebHdfsFileSystem extends FileSystem
throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
run(op, path, new AclPermissionParam(aclSpec));
new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
}
@Override
@ -799,21 +849,21 @@ public class WebHdfsFileSystem extends FileSystem
throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
run(op, path, new AclPermissionParam(aclSpec));
new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
}
@Override
public void removeDefaultAcl(Path path) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
run(op, path);
new FsPathRunner(op, path).run();
}
@Override
public void removeAcl(Path path) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
run(op, path);
new FsPathRunner(op, path).run();
}
@Override
@ -821,7 +871,7 @@ public class WebHdfsFileSystem extends FileSystem
throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETACL;
run(op, p, new AclPermissionParam(aclSpec));
new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
}
@Override
@ -829,8 +879,9 @@ public class WebHdfsFileSystem extends FileSystem
) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
final Map<?, ?> json = run(op, p, new ReplicationParam(replication));
return (Boolean)json.get("boolean");
return new FsPathBooleanRunner(op, p,
new ReplicationParam(replication)
).run();
}
@Override
@ -838,7 +889,10 @@ public class WebHdfsFileSystem extends FileSystem
) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime));
new FsPathRunner(op, p,
new ModificationTimeParam(mtime),
new AccessTimeParam(atime)
).run();
}
@Override
@ -853,32 +907,11 @@ public class WebHdfsFileSystem extends FileSystem
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
}
FSDataOutputStream write(final HttpOpParam.Op op,
final HttpURLConnection conn, final int bufferSize) throws IOException {
return new FSDataOutputStream(new BufferedOutputStream(
conn.getOutputStream(), bufferSize), statistics) {
@Override
public void close() throws IOException {
try {
super.close();
} finally {
try {
validateResponse(op, conn, true);
} finally {
conn.disconnect();
}
}
}
};
}
@Override
public void concat(final Path trg, final Path [] srcs) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
ConcatSourcesParam param = new ConcatSourcesParam(srcs);
run(op, trg, param);
new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
}
@Override
@ -888,14 +921,13 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
return new FsPathRunner(op, f,
return new FsPathOutputStreamRunner(op, f, bufferSize,
new PermissionParam(applyUMask(permission)),
new OverwriteParam(overwrite),
new BufferSizeParam(bufferSize),
new ReplicationParam(replication),
new BlockSizeParam(blockSize))
.run()
.write(bufferSize);
new BlockSizeParam(blockSize)
).run();
}
@Override
@ -904,16 +936,17 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PostOpParam.Op.APPEND;
return new FsPathRunner(op, f, new BufferSizeParam(bufferSize))
.run()
.write(bufferSize);
return new FsPathOutputStreamRunner(op, f, bufferSize,
new BufferSizeParam(bufferSize)
).run();
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
final Map<?, ?> json = run(op, f, new RecursiveParam(recursive));
return (Boolean)json.get("boolean");
return new FsPathBooleanRunner(op, f,
new RecursiveParam(recursive)
).run();
}
@Override
@ -945,7 +978,7 @@ public class WebHdfsFileSystem extends FileSystem
final boolean resolved) throws IOException {
final URL offsetUrl = offset == 0L? url
: new URL(url + "&" + new OffsetParam(offset));
return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
}
}
@ -1001,25 +1034,36 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
final Map<?, ?> json = run(op, f);
final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName());
return new FsPathResponseRunner<FileStatus[]>(op, f) {
@Override
FileStatus[] decodeResponse(Map<?,?> json) {
final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName());
//convert FileStatus
final FileStatus[] statuses = new FileStatus[array.length];
for(int i = 0; i < array.length; i++) {
final Map<?, ?> m = (Map<?, ?>)array[i];
statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f);
}
return statuses;
//convert FileStatus
final FileStatus[] statuses = new FileStatus[array.length];
for (int i = 0; i < array.length; i++) {
final Map<?, ?> m = (Map<?, ?>)array[i];
statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f);
}
return statuses;
}
}.run();
}
@Override
public Token<DelegationTokenIdentifier> getDelegationToken(
final String renewer) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
Token<DelegationTokenIdentifier> token =
new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
op, null, new RenewerParam(renewer)) {
@Override
Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
throws IOException {
return JsonUtil.toDelegationToken(json);
}
}.run();
token.setService(tokenServiceName);
return token;
}
@ -1041,19 +1085,22 @@ public class WebHdfsFileSystem extends FileSystem
public synchronized long renewDelegationToken(final Token<?> token
) throws IOException {
final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
TokenArgumentParam dtargParam = new TokenArgumentParam(
token.encodeToUrlString());
final Map<?, ?> m = run(op, null, dtargParam);
return (Long) m.get("long");
return new FsPathResponseRunner<Long>(op, null,
new TokenArgumentParam(token.encodeToUrlString())) {
@Override
Long decodeResponse(Map<?,?> json) throws IOException {
return (Long) json.get("long");
}
}.run();
}
@Override
public synchronized void cancelDelegationToken(final Token<?> token
) throws IOException {
final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
TokenArgumentParam dtargParam = new TokenArgumentParam(
token.encodeToUrlString());
run(op, null, dtargParam);
new FsPathRunner(op, null,
new TokenArgumentParam(token.encodeToUrlString())
).run();
}
@Override
@ -1071,9 +1118,14 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
final Map<?, ?> m = run(op, p, new OffsetParam(offset),
new LengthParam(length));
return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m));
return new FsPathResponseRunner<BlockLocation[]>(op, p,
new OffsetParam(offset), new LengthParam(length)) {
@Override
BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
return DFSUtil.locatedBlocks2Locations(
JsonUtil.toLocatedBlocks(json));
}
}.run();
}
@Override
@ -1081,8 +1133,12 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
final Map<?, ?> m = run(op, p);
return JsonUtil.toContentSummary(m);
return new FsPathResponseRunner<ContentSummary>(op, p) {
@Override
ContentSummary decodeResponse(Map<?,?> json) {
return JsonUtil.toContentSummary(json);
}
}.run();
}
@Override
@ -1091,8 +1147,12 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
final Map<?, ?> m = run(op, p);
return JsonUtil.toMD5MD5CRC32FileChecksum(m);
return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
@Override
MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
return JsonUtil.toMD5MD5CRC32FileChecksum(json);
}
}.run();
}
/**

View File

@ -102,7 +102,7 @@ public abstract class HttpOpParam<E extends Enum<E> & HttpOpParam.Op>
@Override
public boolean getDoOutput() {
return op.getDoOutput();
return false;
}
@Override

View File

@ -94,10 +94,4 @@ public class WebHdfsTestUtil {
Assert.assertEquals(expectedResponseCode, conn.getResponseCode());
return WebHdfsFileSystem.jsonParse(conn, false);
}
public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs,
final HttpOpParam.Op op, final HttpURLConnection conn,
final int bufferSize) throws IOException {
return webhdfs.write(op, conn, bufferSize);
}
}