From 4dc4e9e63f7385ddd1d64ae1345e0d32a4acb9de Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Wed, 21 Sep 2011 02:56:08 +0000 Subject: [PATCH] HDFS-2340. Support getFileBlockLocations and getDelegationToken in webhdfs. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1173468 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/protocol/DatanodeInfo.java | 20 ++ .../server/namenode/NameNodeRpcServer.java | 8 +- .../web/resources/NamenodeWebHdfsMethods.java | 59 +++- .../org/apache/hadoop/hdfs/web/JsonUtil.java | 286 +++++++++++++++++- .../hadoop/hdfs/web/WebHdfsFileSystem.java | 48 ++- .../hadoop/hdfs/web/resources/GetOpParam.java | 3 + .../hdfs/web/resources/OverwriteParam.java | 2 +- .../hdfs/web/resources/RenewerParam.java | 41 +++ .../hdfs/security/TestDelegationToken.java | 36 ++- .../web/TestWebHdfsFileSystemContract.java | 13 + 11 files changed, 500 insertions(+), 19 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/RenewerParam.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 459d2325d2..a44d09ec32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -16,6 +16,9 @@ Trunk (unreleased changes) HDFS-2318. Provide authentication to webhdfs using SPNEGO and delegation tokens. (szetszwo) + HDFS-2340. Support getFileBlockLocations and getDelegationToken in webhdfs. + (szetszwo) + IMPROVEMENTS HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java index 17a09f695e..af3283ee71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java @@ -115,6 +115,26 @@ protected DatanodeInfo(DatanodeID nodeID, String location, String hostName) { this.location = location; this.hostName = hostName; } + + /** Constructor */ + public DatanodeInfo(final String name, final String storageID, + final int infoPort, final int ipcPort, + final long capacity, final long dfsUsed, final long remaining, + final long blockPoolUsed, final long lastUpdate, final int xceiverCount, + final String networkLocation, final String hostName, + final AdminStates adminState) { + super(name, storageID, infoPort, ipcPort); + + this.capacity = capacity; + this.dfsUsed = dfsUsed; + this.remaining = remaining; + this.blockPoolUsed = blockPoolUsed; + this.lastUpdate = lastUpdate; + this.xceiverCount = xceiverCount; + this.location = networkLocation; + this.hostName = hostName; + this.adminState = adminState; + } /** The raw capacity. */ public long getCapacity() { return capacity; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index b0411bdd84..a461c5bd7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; +import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -909,8 +910,11 @@ void verifyVersion(int version) throws IOException { } private static String getClientMachine() { - String clientMachine = Server.getRemoteAddress(); - if (clientMachine == null) { + String clientMachine = NamenodeWebHdfsMethods.getRemoteAddress(); + if (clientMachine == null) { //not a web client + clientMachine = Server.getRemoteAddress(); + } + if (clientMachine == null) { //not a RPC client clientMachine = ""; } return clientMachine; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index 948466f638..c72437faf1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.web.resources.PutOpParam; import org.apache.hadoop.hdfs.web.resources.RecursiveParam; import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam; +import org.apache.hadoop.hdfs.web.resources.RenewerParam; import org.apache.hadoop.hdfs.web.resources.ReplicationParam; import org.apache.hadoop.hdfs.web.resources.UriFsPathParam; import org.apache.hadoop.hdfs.web.resources.UserParam; @@ -92,7 +93,14 @@ /** Web-hdfs NameNode implementation. */ @Path("") public class NamenodeWebHdfsMethods { - private static final Log LOG = LogFactory.getLog(NamenodeWebHdfsMethods.class); + public static final Log LOG = LogFactory.getLog(NamenodeWebHdfsMethods.class); + + private static final ThreadLocal REMOTE_ADDRESS = new ThreadLocal(); + + /** @return the remote client address. */ + public static String getRemoteAddress() { + return REMOTE_ADDRESS.get(); + } private @Context ServletContext context; private @Context HttpServletRequest request; @@ -215,6 +223,8 @@ public Response put( return ugi.doAs(new PrivilegedExceptionAction() { @Override public Response run() throws IOException, URISyntaxException { + REMOTE_ADDRESS.set(request.getRemoteAddr()); + try { final String fullpath = path.getAbsolutePath(); final NameNode namenode = (NameNode)context.getAttribute("name.node"); @@ -272,6 +282,10 @@ public Response run() throws IOException, URISyntaxException { default: throw new UnsupportedOperationException(op + " is not supported"); } + + } finally { + REMOTE_ADDRESS.set(null); + } } }); } @@ -301,6 +315,8 @@ public Response post( return ugi.doAs(new PrivilegedExceptionAction() { @Override public Response run() throws IOException, URISyntaxException { + REMOTE_ADDRESS.set(request.getRemoteAddr()); + try { final String fullpath = path.getAbsolutePath(); final NameNode namenode = (NameNode)context.getAttribute("name.node"); @@ -315,6 +331,10 @@ public Response run() throws IOException, URISyntaxException { default: throw new UnsupportedOperationException(op + " is not supported"); } + + } finally { + REMOTE_ADDRESS.set(null); + } } }); } @@ -335,10 +355,12 @@ public Response root( final OffsetParam offset, @QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT) final LengthParam length, + @QueryParam(RenewerParam.NAME) @DefaultValue(RenewerParam.DEFAULT) + final RenewerParam renewer, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize ) throws IOException, URISyntaxException, InterruptedException { - return get(ugi, delegation, ROOT, op, offset, length, bufferSize); + return get(ugi, delegation, ROOT, op, offset, length, renewer, bufferSize); } /** Handle HTTP GET request. */ @@ -356,19 +378,23 @@ public Response get( final OffsetParam offset, @QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT) final LengthParam length, + @QueryParam(RenewerParam.NAME) @DefaultValue(RenewerParam.DEFAULT) + final RenewerParam renewer, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize ) throws IOException, URISyntaxException, InterruptedException { if (LOG.isTraceEnabled()) { LOG.trace(op + ": " + path + ", ugi=" + ugi - + Param.toSortedString(", ", offset, length, bufferSize)); + + Param.toSortedString(", ", offset, length, renewer, bufferSize)); } return ugi.doAs(new PrivilegedExceptionAction() { @Override public Response run() throws IOException, URISyntaxException { + REMOTE_ADDRESS.set(request.getRemoteAddr()); + try { final NameNode namenode = (NameNode)context.getAttribute("name.node"); final String fullpath = path.getAbsolutePath(); @@ -381,6 +407,15 @@ public Response run() throws IOException, URISyntaxException { op.getValue(), offset.getValue(), offset, length, bufferSize); return Response.temporaryRedirect(uri).build(); } + case GETFILEBLOCKLOCATIONS: + { + final long offsetValue = offset.getValue(); + final Long lengthValue = length.getValue(); + final LocatedBlocks locatedblocks = np.getBlockLocations(fullpath, + offsetValue, lengthValue != null? lengthValue: offsetValue + 1); + final String js = JsonUtil.toJsonString(locatedblocks); + return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); + } case GETFILESTATUS: { final HdfsFileStatus status = np.getFileInfo(fullpath); @@ -392,9 +427,20 @@ public Response run() throws IOException, URISyntaxException { final StreamingOutput streaming = getListingStream(np, fullpath); return Response.ok(streaming).type(MediaType.APPLICATION_JSON).build(); } + case GETDELEGATIONTOKEN: + { + final Token token = generateDelegationToken( + namenode, ugi, renewer.getValue()); + final String js = JsonUtil.toJsonString(token); + return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); + } default: throw new UnsupportedOperationException(op + " is not supported"); } + + } finally { + REMOTE_ADDRESS.set(null); + } } }); } @@ -462,6 +508,9 @@ public Response delete( return ugi.doAs(new PrivilegedExceptionAction() { @Override public Response run() throws IOException { + REMOTE_ADDRESS.set(request.getRemoteAddr()); + try { + final NameNode namenode = (NameNode)context.getAttribute("name.node"); final String fullpath = path.getAbsolutePath(); @@ -475,6 +524,10 @@ public Response run() throws IOException { default: throw new UnsupportedOperationException(op + " is not supported"); } + + } finally { + REMOTE_ADDRESS.set(null); + } } }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java index 1c18dc334e..314d53b38f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java @@ -17,19 +17,31 @@ */ package org.apache.hadoop.hdfs.web; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.mortbay.util.ajax.JSON; /** JSON Utilities */ public class JsonUtil { - private static final ThreadLocal> jsonMap - = new ThreadLocal>() { + private static class ThreadLocalMap extends ThreadLocal> { @Override protected Map initialValue() { return new TreeMap(); @@ -41,7 +53,54 @@ public Map get() { m.clear(); return m; } - }; + } + + private static final ThreadLocalMap jsonMap = new ThreadLocalMap(); + private static final ThreadLocalMap tokenMap = new ThreadLocalMap(); + private static final ThreadLocalMap datanodeInfoMap = new ThreadLocalMap(); + private static final ThreadLocalMap extendedBlockMap = new ThreadLocalMap(); + private static final ThreadLocalMap locatedBlockMap = new ThreadLocalMap(); + + private static final DatanodeInfo[] EMPTY_DATANODE_INFO_ARRAY = {}; + + /** Convert a token object to a Json string. */ + public static String toJsonString(final Token token + ) throws IOException { + if (token == null) { + return null; + } + + final Map m = tokenMap.get(); + m.put("urlString", token.encodeToUrlString()); + return JSON.toString(m); + } + + /** Convert a Json map to a Token. */ + public static Token toToken( + final Map m) throws IOException { + if (m == null) { + return null; + } + + final Token token + = new Token(); + token.decodeFromUrlString((String)m.get("urlString")); + return token; + } + + /** Convert a Json map to a Token of DelegationTokenIdentifier. */ + @SuppressWarnings("unchecked") + public static Token toDelegationToken( + final Map m) throws IOException { + return (Token)toToken(m); + } + + /** Convert a Json map to a Token of BlockTokenIdentifier. */ + @SuppressWarnings("unchecked") + public static Token toBlockToken( + final Map m) throws IOException { + return (Token)toToken(m); + } /** Convert an exception object to a Json string. */ public static String toJsonString(final Exception e) { @@ -77,11 +136,10 @@ public static FsPermission toFsPermission(final String s) { /** Convert a HdfsFileStatus object to a Json string. */ public static String toJsonString(final HdfsFileStatus status) { - final Map m = jsonMap.get(); if (status == null) { - m.put("isNull", true); + return null; } else { - m.put("isNull", false); + final Map m = jsonMap.get(); m.put("localName", status.getLocalName()); m.put("isDir", status.isDir()); m.put("isSymlink", status.isSymlink()); @@ -97,8 +155,8 @@ public static String toJsonString(final HdfsFileStatus status) { m.put("modificationTime", status.getModificationTime()); m.put("blockSize", status.getBlockSize()); m.put("replication", status.getReplication()); + return JSON.toString(m); } - return JSON.toString(m); } @SuppressWarnings("unchecked") @@ -106,9 +164,9 @@ static Map parse(String jsonString) { return (Map) JSON.parse(jsonString); } - /** Convert a Json string to a HdfsFileStatus object. */ + /** Convert a Json map to a HdfsFileStatus object. */ public static HdfsFileStatus toFileStatus(final Map m) { - if ((Boolean)m.get("isNull")) { + if (m == null) { return null; } @@ -130,4 +188,214 @@ public static HdfsFileStatus toFileStatus(final Map m) { permission, owner, group, symlink, DFSUtil.string2Bytes(localName)); } + + /** Convert a LocatedBlock to a Json string. */ + public static String toJsonString(final ExtendedBlock extendedblock) { + if (extendedblock == null) { + return null; + } + + final Map m = extendedBlockMap.get(); + m.put("blockPoolId", extendedblock.getBlockPoolId()); + m.put("blockId", extendedblock.getBlockId()); + m.put("numBytes", extendedblock.getNumBytes()); + m.put("generationStamp", extendedblock.getGenerationStamp()); + return JSON.toString(m); + } + + /** Convert a Json map to an ExtendedBlock object. */ + public static ExtendedBlock toExtendedBlock(final Map m) { + if (m == null) { + return null; + } + + final String blockPoolId = (String)m.get("blockPoolId"); + final long blockId = (Long)m.get("blockId"); + final long numBytes = (Long)m.get("numBytes"); + final long generationStamp = (Long)m.get("generationStamp"); + return new ExtendedBlock(blockPoolId, blockId, numBytes, generationStamp); + } + + /** Convert a DatanodeInfo to a Json string. */ + public static String toJsonString(final DatanodeInfo datanodeinfo) { + if (datanodeinfo == null) { + return null; + } + + final Map m = datanodeInfoMap.get(); + m.put("name", datanodeinfo.getName()); + m.put("storageID", datanodeinfo.getStorageID()); + m.put("infoPort", datanodeinfo.getInfoPort()); + + m.put("ipcPort", datanodeinfo.getIpcPort()); + + m.put("capacity", datanodeinfo.getCapacity()); + m.put("dfsUsed", datanodeinfo.getDfsUsed()); + m.put("remaining", datanodeinfo.getRemaining()); + m.put("blockPoolUsed", datanodeinfo.getBlockPoolUsed()); + m.put("lastUpdate", datanodeinfo.getLastUpdate()); + m.put("xceiverCount", datanodeinfo.getXceiverCount()); + m.put("networkLocation", datanodeinfo.getNetworkLocation()); + m.put("hostName", datanodeinfo.getHostName()); + m.put("adminState", datanodeinfo.getAdminState().name()); + return JSON.toString(m); + } + + /** Convert a Json map to an DatanodeInfo object. */ + public static DatanodeInfo toDatanodeInfo(final Map m) { + if (m == null) { + return null; + } + + return new DatanodeInfo( + (String)m.get("name"), + (String)m.get("storageID"), + (int)(long)(Long)m.get("infoPort"), + (int)(long)(Long)m.get("ipcPort"), + + (Long)m.get("capacity"), + (Long)m.get("dfsUsed"), + (Long)m.get("remaining"), + (Long)m.get("blockPoolUsed"), + (Long)m.get("lastUpdate"), + (int)(long)(Long)m.get("xceiverCount"), + (String)m.get("networkLocation"), + (String)m.get("hostName"), + AdminStates.valueOf((String)m.get("adminState"))); + } + + /** Convert a DatanodeInfo[] to a Json string. */ + public static String toJsonString(final DatanodeInfo[] array + ) throws IOException { + if (array == null) { + return null; + } else if (array.length == 0) { + return "[]"; + } else { + final StringBuilder b = new StringBuilder().append('[').append( + toJsonString(array[0])); + for(int i = 1; i < array.length; i++) { + b.append(", ").append(toJsonString(array[i])); + } + return b.append(']').toString(); + } + } + + /** Convert an Object[] to a DatanodeInfo[]. */ + public static DatanodeInfo[] toDatanodeInfoArray(final Object[] objects) { + if (objects == null) { + return null; + } else if (objects.length == 0) { + return EMPTY_DATANODE_INFO_ARRAY; + } else { + final DatanodeInfo[] array = new DatanodeInfo[objects.length]; + for(int i = 0; i < array.length; i++) { + array[i] = (DatanodeInfo)toDatanodeInfo((Map) objects[i]); + } + return array; + } + } + + /** Convert a LocatedBlock to a Json string. */ + public static String toJsonString(final LocatedBlock locatedblock + ) throws IOException { + if (locatedblock == null) { + return null; + } + + final Map m = locatedBlockMap.get(); + m.put("blockToken", toJsonString(locatedblock.getBlockToken())); + m.put("isCorrupt", locatedblock.isCorrupt()); + m.put("startOffset", locatedblock.getStartOffset()); + m.put("block", toJsonString(locatedblock.getBlock())); + + m.put("locations", toJsonString(locatedblock.getLocations())); + return JSON.toString(m); + } + + /** Convert a Json map to LocatedBlock. */ + public static LocatedBlock toLocatedBlock(final Map m) throws IOException { + if (m == null) { + return null; + } + + final ExtendedBlock b = toExtendedBlock((Map)JSON.parse((String)m.get("block"))); + final DatanodeInfo[] locations = toDatanodeInfoArray( + (Object[])JSON.parse((String)m.get("locations"))); + final long startOffset = (Long)m.get("startOffset"); + final boolean isCorrupt = (Boolean)m.get("isCorrupt"); + + final LocatedBlock locatedblock = new LocatedBlock(b, locations, startOffset, isCorrupt); + locatedblock.setBlockToken(toBlockToken((Map)JSON.parse((String)m.get("blockToken")))); + return locatedblock; + } + + /** Convert a LocatedBlock[] to a Json string. */ + public static String toJsonString(final List array + ) throws IOException { + if (array == null) { + return null; + } else if (array.size() == 0) { + return "[]"; + } else { + final StringBuilder b = new StringBuilder().append('[').append( + toJsonString(array.get(0))); + for(int i = 1; i < array.size(); i++) { + b.append(",\n ").append(toJsonString(array.get(i))); + } + return b.append(']').toString(); + } + } + + /** Convert an Object[] to a List of LocatedBlock. + * @throws IOException */ + public static List toLocatedBlockList(final Object[] objects + ) throws IOException { + if (objects == null) { + return null; + } else if (objects.length == 0) { + return Collections.emptyList(); + } else { + final List list = new ArrayList(objects.length); + for(int i = 0; i < objects.length; i++) { + list.add((LocatedBlock)toLocatedBlock((Map)objects[i])); + } + return list; + } + } + + /** Convert LocatedBlocks to a Json string. */ + public static String toJsonString(final LocatedBlocks locatedblocks + ) throws IOException { + if (locatedblocks == null) { + return null; + } + + final Map m = jsonMap.get(); + m.put("fileLength", locatedblocks.getFileLength()); + m.put("isUnderConstruction", locatedblocks.isUnderConstruction()); + + m.put("locatedBlocks", toJsonString(locatedblocks.getLocatedBlocks())); + m.put("lastLocatedBlock", toJsonString(locatedblocks.getLastLocatedBlock())); + m.put("isLastBlockComplete", locatedblocks.isLastBlockComplete()); + return JSON.toString(m); + } + + /** Convert a Json map to LocatedBlock. */ + public static LocatedBlocks toLocatedBlocks(final Map m + ) throws IOException { + if (m == null) { + return null; + } + + final long fileLength = (Long)m.get("fileLength"); + final boolean isUnderConstruction = (Boolean)m.get("isUnderConstruction"); + final List locatedBlocks = toLocatedBlockList( + (Object[])JSON.parse((String) m.get("locatedBlocks"))); + final LocatedBlock lastLocatedBlock = toLocatedBlock( + (Map)JSON.parse((String)m.get("lastLocatedBlock"))); + final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete"); + return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks, + lastLocatedBlock, isLastBlockComplete); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index df86456e89..061d44bbe4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -27,9 +27,12 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.util.Arrays; +import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -45,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.web.resources.AccessTimeParam; import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; @@ -54,7 +58,9 @@ import org.apache.hadoop.hdfs.web.resources.GetOpParam; import org.apache.hadoop.hdfs.web.resources.GroupParam; import org.apache.hadoop.hdfs.web.resources.HttpOpParam; +import org.apache.hadoop.hdfs.web.resources.LengthParam; import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam; +import org.apache.hadoop.hdfs.web.resources.OffsetParam; import org.apache.hadoop.hdfs.web.resources.OverwriteParam; import org.apache.hadoop.hdfs.web.resources.OwnerParam; import org.apache.hadoop.hdfs.web.resources.Param; @@ -63,13 +69,16 @@ import org.apache.hadoop.hdfs.web.resources.PutOpParam; import org.apache.hadoop.hdfs.web.resources.RecursiveParam; import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam; +import org.apache.hadoop.hdfs.web.resources.RenewerParam; import org.apache.hadoop.hdfs.web.resources.ReplicationParam; import org.apache.hadoop.hdfs.web.resources.UserParam; +import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; import org.mortbay.util.ajax.JSON; @@ -167,7 +176,7 @@ private URL toUrl(final HttpOpParam.Op op, final Path fspath, final Param... parameters) throws IOException { //initialize URI path and query final String path = "/" + PATH_PREFIX - + makeQualified(fspath).toUri().getPath(); + + (fspath == null? "/": makeQualified(fspath).toUri().getPath()); final String query = op.toQueryString() + '&' + new UserParam(ugi) + Param.toSortedString("&", parameters); @@ -396,4 +405,41 @@ public FileStatus[] listStatus(final Path f) throws IOException { } return statuses; } + + @Override + public Token getDelegationToken(final String renewer + ) throws IOException { + final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN; + final Map m = run(op, null, new RenewerParam(renewer)); + final Token token = JsonUtil.toDelegationToken(m); + token.setService(new Text(getCanonicalServiceName())); + return token; + } + + @Override + public List> getDelegationTokens(final String renewer + ) throws IOException { + final Token[] t = {getDelegationToken(renewer)}; + return Arrays.asList(t); + } + + @Override + public BlockLocation[] getFileBlockLocations(final FileStatus status, + final long offset, final long length) throws IOException { + if (status == null) { + return null; + } + return getFileBlockLocations(status.getPath(), offset, length); + } + + @Override + public BlockLocation[] getFileBlockLocations(final Path p, + final long offset, final long length) throws IOException { + statistics.incrementReadOps(1); + + final HttpOpParam.Op op = GetOpParam.Op.GETFILEBLOCKLOCATIONS; + final Map m = run(op, p, new OffsetParam(offset), + new LengthParam(length)); + return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m)); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java index 6f11871ebb..aeb3135404 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java @@ -27,10 +27,13 @@ public class GetOpParam extends HttpOpParam { /** Get operations. */ public static enum Op implements HttpOpParam.Op { OPEN(HttpURLConnection.HTTP_OK), + GETFILEBLOCKLOCATIONS(HttpURLConnection.HTTP_OK), GETFILESTATUS(HttpURLConnection.HTTP_OK), LISTSTATUS(HttpURLConnection.HTTP_OK), + GETDELEGATIONTOKEN(HttpURLConnection.HTTP_OK), + NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED); final int expectedHttpResponseCode; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/OverwriteParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/OverwriteParam.java index 6639ece7b2..f6945bb435 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/OverwriteParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/OverwriteParam.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.web.resources; -/** Recursive parameter. */ +/** Overwrite parameter. */ public class OverwriteParam extends BooleanParam { /** Parameter name. */ public static final String NAME = "overwrite"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/RenewerParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/RenewerParam.java new file mode 100644 index 0000000000..750e8bc91b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/RenewerParam.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.resources; + +/** Renewer parameter. */ +public class RenewerParam extends StringParam { + /** Parameter name. */ + public static final String NAME = "renewer"; + /** Default parameter value. */ + public static final String DEFAULT = NULL; + + private static final Domain DOMAIN = new Domain(NAME, null); + + /** + * Constructor. + * @param str a string representation of the parameter value. + */ + public RenewerParam(final String str) { + super(DOMAIN, str == null || str.equals(DEFAULT)? null: str); + } + + @Override + public String getName() { + return NAME; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationToken.java index d6397b6a2e..9c577f740e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationToken.java @@ -23,12 +23,12 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.net.URI; import java.security.PrivilegedExceptionAction; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -38,12 +38,16 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; +import org.apache.log4j.Level; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -56,12 +60,13 @@ public class TestDelegationToken { @Before public void setUp() throws Exception { config = new HdfsConfiguration(); + config.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true); config.setLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, 10000); config.setLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 5000); config.set("hadoop.security.auth_to_local", "RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT"); FileSystem.setDefaultUri(config, "hdfs://localhost:" + "0"); - cluster = new MiniDFSCluster.Builder(config).build(); + cluster = new MiniDFSCluster.Builder(config).numDataNodes(0).build(); cluster.waitActive(); dtSecretManager = NameNodeAdapter.getDtSecretManager( cluster.getNamesystem()); @@ -153,6 +158,31 @@ public void testDelegationTokenDFSApi() throws Exception { dtSecretManager.renewToken(token, "JobTracker"); } + @Test + public void testDelegationTokenWebHdfsApi() throws Exception { + ((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL); + final String uri = WebHdfsFileSystem.SCHEME + "://" + + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); + //get file system as JobTracker + final UserGroupInformation ugi = UserGroupInformation.createUserForTesting( + "JobTracker", new String[]{"user"}); + final WebHdfsFileSystem webhdfs = ugi.doAs( + new PrivilegedExceptionAction() { + @Override + public WebHdfsFileSystem run() throws Exception { + return (WebHdfsFileSystem)FileSystem.get(new URI(uri), config); + } + }); + + final Token token = webhdfs.getDelegationToken("JobTracker"); + DelegationTokenIdentifier identifier = new DelegationTokenIdentifier(); + byte[] tokenId = token.getIdentifier(); + identifier.readFields(new DataInputStream(new ByteArrayInputStream(tokenId))); + LOG.info("A valid token should have non-null password, and should be renewed successfully"); + Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier)); + dtSecretManager.renewToken(token, "JobTracker"); + } + @Test public void testDelegationTokenWithDoAs() throws Exception { final DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java index 4c2264fea8..47ae417430 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java @@ -23,6 +23,7 @@ import java.security.PrivilegedExceptionAction; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemContractBaseTest; import org.apache.hadoop.fs.Path; @@ -114,4 +115,16 @@ public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception { // also okay for HDFS. } } + + public void testGetFileBlockLocations() throws IOException { + final String f = "/test/testGetFileBlockLocations"; + createFile(path(f)); + final BlockLocation[] computed = fs.getFileBlockLocations(new Path(f), 0L, 1L); + final BlockLocation[] expected = cluster.getFileSystem().getFileBlockLocations( + new Path(f), 0L, 1L); + assertEquals(expected.length, computed.length); + for(int i = 0; i < computed.length; i++) { + assertEquals(expected[i].toString(), computed[i].toString()); + } + } }