HDFS-2340. Support getFileBlockLocations and getDelegationToken in webhdfs.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1173468 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0e870d7d18
commit
4dc4e9e63f
@ -16,6 +16,9 @@ Trunk (unreleased changes)
|
||||
HDFS-2318. Provide authentication to webhdfs using SPNEGO and delegation
|
||||
tokens. (szetszwo)
|
||||
|
||||
HDFS-2340. Support getFileBlockLocations and getDelegationToken in webhdfs.
|
||||
(szetszwo)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
|
||||
|
@ -115,6 +115,26 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||
this.location = location;
|
||||
this.hostName = hostName;
|
||||
}
|
||||
|
||||
/** Constructor */
|
||||
public DatanodeInfo(final String name, final String storageID,
|
||||
final int infoPort, final int ipcPort,
|
||||
final long capacity, final long dfsUsed, final long remaining,
|
||||
final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
|
||||
final String networkLocation, final String hostName,
|
||||
final AdminStates adminState) {
|
||||
super(name, storageID, infoPort, ipcPort);
|
||||
|
||||
this.capacity = capacity;
|
||||
this.dfsUsed = dfsUsed;
|
||||
this.remaining = remaining;
|
||||
this.blockPoolUsed = blockPoolUsed;
|
||||
this.lastUpdate = lastUpdate;
|
||||
this.xceiverCount = xceiverCount;
|
||||
this.location = networkLocation;
|
||||
this.hostName = hostName;
|
||||
this.adminState = adminState;
|
||||
}
|
||||
|
||||
/** The raw capacity. */
|
||||
public long getCapacity() { return capacity; }
|
||||
|
@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
||||
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
@ -909,8 +910,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||
}
|
||||
|
||||
private static String getClientMachine() {
|
||||
String clientMachine = Server.getRemoteAddress();
|
||||
if (clientMachine == null) {
|
||||
String clientMachine = NamenodeWebHdfsMethods.getRemoteAddress();
|
||||
if (clientMachine == null) { //not a web client
|
||||
clientMachine = Server.getRemoteAddress();
|
||||
}
|
||||
if (clientMachine == null) { //not a RPC client
|
||||
clientMachine = "";
|
||||
}
|
||||
return clientMachine;
|
||||
|
@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.web.resources.PostOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
||||
@ -92,7 +93,14 @@ import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
/** Web-hdfs NameNode implementation. */
|
||||
@Path("")
|
||||
public class NamenodeWebHdfsMethods {
|
||||
private static final Log LOG = LogFactory.getLog(NamenodeWebHdfsMethods.class);
|
||||
public static final Log LOG = LogFactory.getLog(NamenodeWebHdfsMethods.class);
|
||||
|
||||
private static final ThreadLocal<String> REMOTE_ADDRESS = new ThreadLocal<String>();
|
||||
|
||||
/** @return the remote client address. */
|
||||
public static String getRemoteAddress() {
|
||||
return REMOTE_ADDRESS.get();
|
||||
}
|
||||
|
||||
private @Context ServletContext context;
|
||||
private @Context HttpServletRequest request;
|
||||
@ -215,6 +223,8 @@ public class NamenodeWebHdfsMethods {
|
||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
||||
@Override
|
||||
public Response run() throws IOException, URISyntaxException {
|
||||
REMOTE_ADDRESS.set(request.getRemoteAddr());
|
||||
try {
|
||||
|
||||
final String fullpath = path.getAbsolutePath();
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
@ -272,6 +282,10 @@ public class NamenodeWebHdfsMethods {
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
|
||||
} finally {
|
||||
REMOTE_ADDRESS.set(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -301,6 +315,8 @@ public class NamenodeWebHdfsMethods {
|
||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
||||
@Override
|
||||
public Response run() throws IOException, URISyntaxException {
|
||||
REMOTE_ADDRESS.set(request.getRemoteAddr());
|
||||
try {
|
||||
|
||||
final String fullpath = path.getAbsolutePath();
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
@ -315,6 +331,10 @@ public class NamenodeWebHdfsMethods {
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
|
||||
} finally {
|
||||
REMOTE_ADDRESS.set(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -335,10 +355,12 @@ public class NamenodeWebHdfsMethods {
|
||||
final OffsetParam offset,
|
||||
@QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT)
|
||||
final LengthParam length,
|
||||
@QueryParam(RenewerParam.NAME) @DefaultValue(RenewerParam.DEFAULT)
|
||||
final RenewerParam renewer,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize
|
||||
) throws IOException, URISyntaxException, InterruptedException {
|
||||
return get(ugi, delegation, ROOT, op, offset, length, bufferSize);
|
||||
return get(ugi, delegation, ROOT, op, offset, length, renewer, bufferSize);
|
||||
}
|
||||
|
||||
/** Handle HTTP GET request. */
|
||||
@ -356,19 +378,23 @@ public class NamenodeWebHdfsMethods {
|
||||
final OffsetParam offset,
|
||||
@QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT)
|
||||
final LengthParam length,
|
||||
@QueryParam(RenewerParam.NAME) @DefaultValue(RenewerParam.DEFAULT)
|
||||
final RenewerParam renewer,
|
||||
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
||||
final BufferSizeParam bufferSize
|
||||
) throws IOException, URISyntaxException, InterruptedException {
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(op + ": " + path + ", ugi=" + ugi
|
||||
+ Param.toSortedString(", ", offset, length, bufferSize));
|
||||
+ Param.toSortedString(", ", offset, length, renewer, bufferSize));
|
||||
}
|
||||
|
||||
|
||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
||||
@Override
|
||||
public Response run() throws IOException, URISyntaxException {
|
||||
REMOTE_ADDRESS.set(request.getRemoteAddr());
|
||||
try {
|
||||
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final String fullpath = path.getAbsolutePath();
|
||||
@ -381,6 +407,15 @@ public class NamenodeWebHdfsMethods {
|
||||
op.getValue(), offset.getValue(), offset, length, bufferSize);
|
||||
return Response.temporaryRedirect(uri).build();
|
||||
}
|
||||
case GETFILEBLOCKLOCATIONS:
|
||||
{
|
||||
final long offsetValue = offset.getValue();
|
||||
final Long lengthValue = length.getValue();
|
||||
final LocatedBlocks locatedblocks = np.getBlockLocations(fullpath,
|
||||
offsetValue, lengthValue != null? lengthValue: offsetValue + 1);
|
||||
final String js = JsonUtil.toJsonString(locatedblocks);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case GETFILESTATUS:
|
||||
{
|
||||
final HdfsFileStatus status = np.getFileInfo(fullpath);
|
||||
@ -392,9 +427,20 @@ public class NamenodeWebHdfsMethods {
|
||||
final StreamingOutput streaming = getListingStream(np, fullpath);
|
||||
return Response.ok(streaming).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case GETDELEGATIONTOKEN:
|
||||
{
|
||||
final Token<? extends TokenIdentifier> token = generateDelegationToken(
|
||||
namenode, ugi, renewer.getValue());
|
||||
final String js = JsonUtil.toJsonString(token);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
|
||||
} finally {
|
||||
REMOTE_ADDRESS.set(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -462,6 +508,9 @@ public class NamenodeWebHdfsMethods {
|
||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
||||
@Override
|
||||
public Response run() throws IOException {
|
||||
REMOTE_ADDRESS.set(request.getRemoteAddr());
|
||||
try {
|
||||
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final String fullpath = path.getAbsolutePath();
|
||||
|
||||
@ -475,6 +524,10 @@ public class NamenodeWebHdfsMethods {
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
|
||||
} finally {
|
||||
REMOTE_ADDRESS.set(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -17,19 +17,31 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
|
||||
/** JSON Utilities */
|
||||
public class JsonUtil {
|
||||
private static final ThreadLocal<Map<String, Object>> jsonMap
|
||||
= new ThreadLocal<Map<String, Object>>() {
|
||||
private static class ThreadLocalMap extends ThreadLocal<Map<String, Object>> {
|
||||
@Override
|
||||
protected Map<String, Object> initialValue() {
|
||||
return new TreeMap<String, Object>();
|
||||
@ -41,7 +53,54 @@ public class JsonUtil {
|
||||
m.clear();
|
||||
return m;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static final ThreadLocalMap jsonMap = new ThreadLocalMap();
|
||||
private static final ThreadLocalMap tokenMap = new ThreadLocalMap();
|
||||
private static final ThreadLocalMap datanodeInfoMap = new ThreadLocalMap();
|
||||
private static final ThreadLocalMap extendedBlockMap = new ThreadLocalMap();
|
||||
private static final ThreadLocalMap locatedBlockMap = new ThreadLocalMap();
|
||||
|
||||
private static final DatanodeInfo[] EMPTY_DATANODE_INFO_ARRAY = {};
|
||||
|
||||
/** Convert a token object to a Json string. */
|
||||
public static String toJsonString(final Token<? extends TokenIdentifier> token
|
||||
) throws IOException {
|
||||
if (token == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Map<String, Object> m = tokenMap.get();
|
||||
m.put("urlString", token.encodeToUrlString());
|
||||
return JSON.toString(m);
|
||||
}
|
||||
|
||||
/** Convert a Json map to a Token. */
|
||||
public static Token<? extends TokenIdentifier> toToken(
|
||||
final Map<?, ?> m) throws IOException {
|
||||
if (m == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Token<DelegationTokenIdentifier> token
|
||||
= new Token<DelegationTokenIdentifier>();
|
||||
token.decodeFromUrlString((String)m.get("urlString"));
|
||||
return token;
|
||||
}
|
||||
|
||||
/** Convert a Json map to a Token of DelegationTokenIdentifier. */
|
||||
@SuppressWarnings("unchecked")
|
||||
public static Token<DelegationTokenIdentifier> toDelegationToken(
|
||||
final Map<?, ?> m) throws IOException {
|
||||
return (Token<DelegationTokenIdentifier>)toToken(m);
|
||||
}
|
||||
|
||||
/** Convert a Json map to a Token of BlockTokenIdentifier. */
|
||||
@SuppressWarnings("unchecked")
|
||||
public static Token<BlockTokenIdentifier> toBlockToken(
|
||||
final Map<?, ?> m) throws IOException {
|
||||
return (Token<BlockTokenIdentifier>)toToken(m);
|
||||
}
|
||||
|
||||
/** Convert an exception object to a Json string. */
|
||||
public static String toJsonString(final Exception e) {
|
||||
@ -77,11 +136,10 @@ public class JsonUtil {
|
||||
|
||||
/** Convert a HdfsFileStatus object to a Json string. */
|
||||
public static String toJsonString(final HdfsFileStatus status) {
|
||||
final Map<String, Object> m = jsonMap.get();
|
||||
if (status == null) {
|
||||
m.put("isNull", true);
|
||||
return null;
|
||||
} else {
|
||||
m.put("isNull", false);
|
||||
final Map<String, Object> m = jsonMap.get();
|
||||
m.put("localName", status.getLocalName());
|
||||
m.put("isDir", status.isDir());
|
||||
m.put("isSymlink", status.isSymlink());
|
||||
@ -97,8 +155,8 @@ public class JsonUtil {
|
||||
m.put("modificationTime", status.getModificationTime());
|
||||
m.put("blockSize", status.getBlockSize());
|
||||
m.put("replication", status.getReplication());
|
||||
return JSON.toString(m);
|
||||
}
|
||||
return JSON.toString(m);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -106,9 +164,9 @@ public class JsonUtil {
|
||||
return (Map<String, Object>) JSON.parse(jsonString);
|
||||
}
|
||||
|
||||
/** Convert a Json string to a HdfsFileStatus object. */
|
||||
/** Convert a Json map to a HdfsFileStatus object. */
|
||||
public static HdfsFileStatus toFileStatus(final Map<String, Object> m) {
|
||||
if ((Boolean)m.get("isNull")) {
|
||||
if (m == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -130,4 +188,214 @@ public class JsonUtil {
|
||||
permission, owner, group,
|
||||
symlink, DFSUtil.string2Bytes(localName));
|
||||
}
|
||||
|
||||
/** Convert a LocatedBlock to a Json string. */
|
||||
public static String toJsonString(final ExtendedBlock extendedblock) {
|
||||
if (extendedblock == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Map<String, Object> m = extendedBlockMap.get();
|
||||
m.put("blockPoolId", extendedblock.getBlockPoolId());
|
||||
m.put("blockId", extendedblock.getBlockId());
|
||||
m.put("numBytes", extendedblock.getNumBytes());
|
||||
m.put("generationStamp", extendedblock.getGenerationStamp());
|
||||
return JSON.toString(m);
|
||||
}
|
||||
|
||||
/** Convert a Json map to an ExtendedBlock object. */
|
||||
public static ExtendedBlock toExtendedBlock(final Map<?, ?> m) {
|
||||
if (m == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final String blockPoolId = (String)m.get("blockPoolId");
|
||||
final long blockId = (Long)m.get("blockId");
|
||||
final long numBytes = (Long)m.get("numBytes");
|
||||
final long generationStamp = (Long)m.get("generationStamp");
|
||||
return new ExtendedBlock(blockPoolId, blockId, numBytes, generationStamp);
|
||||
}
|
||||
|
||||
/** Convert a DatanodeInfo to a Json string. */
|
||||
public static String toJsonString(final DatanodeInfo datanodeinfo) {
|
||||
if (datanodeinfo == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Map<String, Object> m = datanodeInfoMap.get();
|
||||
m.put("name", datanodeinfo.getName());
|
||||
m.put("storageID", datanodeinfo.getStorageID());
|
||||
m.put("infoPort", datanodeinfo.getInfoPort());
|
||||
|
||||
m.put("ipcPort", datanodeinfo.getIpcPort());
|
||||
|
||||
m.put("capacity", datanodeinfo.getCapacity());
|
||||
m.put("dfsUsed", datanodeinfo.getDfsUsed());
|
||||
m.put("remaining", datanodeinfo.getRemaining());
|
||||
m.put("blockPoolUsed", datanodeinfo.getBlockPoolUsed());
|
||||
m.put("lastUpdate", datanodeinfo.getLastUpdate());
|
||||
m.put("xceiverCount", datanodeinfo.getXceiverCount());
|
||||
m.put("networkLocation", datanodeinfo.getNetworkLocation());
|
||||
m.put("hostName", datanodeinfo.getHostName());
|
||||
m.put("adminState", datanodeinfo.getAdminState().name());
|
||||
return JSON.toString(m);
|
||||
}
|
||||
|
||||
/** Convert a Json map to an DatanodeInfo object. */
|
||||
public static DatanodeInfo toDatanodeInfo(final Map<?, ?> m) {
|
||||
if (m == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new DatanodeInfo(
|
||||
(String)m.get("name"),
|
||||
(String)m.get("storageID"),
|
||||
(int)(long)(Long)m.get("infoPort"),
|
||||
(int)(long)(Long)m.get("ipcPort"),
|
||||
|
||||
(Long)m.get("capacity"),
|
||||
(Long)m.get("dfsUsed"),
|
||||
(Long)m.get("remaining"),
|
||||
(Long)m.get("blockPoolUsed"),
|
||||
(Long)m.get("lastUpdate"),
|
||||
(int)(long)(Long)m.get("xceiverCount"),
|
||||
(String)m.get("networkLocation"),
|
||||
(String)m.get("hostName"),
|
||||
AdminStates.valueOf((String)m.get("adminState")));
|
||||
}
|
||||
|
||||
/** Convert a DatanodeInfo[] to a Json string. */
|
||||
public static String toJsonString(final DatanodeInfo[] array
|
||||
) throws IOException {
|
||||
if (array == null) {
|
||||
return null;
|
||||
} else if (array.length == 0) {
|
||||
return "[]";
|
||||
} else {
|
||||
final StringBuilder b = new StringBuilder().append('[').append(
|
||||
toJsonString(array[0]));
|
||||
for(int i = 1; i < array.length; i++) {
|
||||
b.append(", ").append(toJsonString(array[i]));
|
||||
}
|
||||
return b.append(']').toString();
|
||||
}
|
||||
}
|
||||
|
||||
/** Convert an Object[] to a DatanodeInfo[]. */
|
||||
public static DatanodeInfo[] toDatanodeInfoArray(final Object[] objects) {
|
||||
if (objects == null) {
|
||||
return null;
|
||||
} else if (objects.length == 0) {
|
||||
return EMPTY_DATANODE_INFO_ARRAY;
|
||||
} else {
|
||||
final DatanodeInfo[] array = new DatanodeInfo[objects.length];
|
||||
for(int i = 0; i < array.length; i++) {
|
||||
array[i] = (DatanodeInfo)toDatanodeInfo((Map<?, ?>) objects[i]);
|
||||
}
|
||||
return array;
|
||||
}
|
||||
}
|
||||
|
||||
/** Convert a LocatedBlock to a Json string. */
|
||||
public static String toJsonString(final LocatedBlock locatedblock
|
||||
) throws IOException {
|
||||
if (locatedblock == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Map<String, Object> m = locatedBlockMap.get();
|
||||
m.put("blockToken", toJsonString(locatedblock.getBlockToken()));
|
||||
m.put("isCorrupt", locatedblock.isCorrupt());
|
||||
m.put("startOffset", locatedblock.getStartOffset());
|
||||
m.put("block", toJsonString(locatedblock.getBlock()));
|
||||
|
||||
m.put("locations", toJsonString(locatedblock.getLocations()));
|
||||
return JSON.toString(m);
|
||||
}
|
||||
|
||||
/** Convert a Json map to LocatedBlock. */
|
||||
public static LocatedBlock toLocatedBlock(final Map<?, ?> m) throws IOException {
|
||||
if (m == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final ExtendedBlock b = toExtendedBlock((Map<?, ?>)JSON.parse((String)m.get("block")));
|
||||
final DatanodeInfo[] locations = toDatanodeInfoArray(
|
||||
(Object[])JSON.parse((String)m.get("locations")));
|
||||
final long startOffset = (Long)m.get("startOffset");
|
||||
final boolean isCorrupt = (Boolean)m.get("isCorrupt");
|
||||
|
||||
final LocatedBlock locatedblock = new LocatedBlock(b, locations, startOffset, isCorrupt);
|
||||
locatedblock.setBlockToken(toBlockToken((Map<?, ?>)JSON.parse((String)m.get("blockToken"))));
|
||||
return locatedblock;
|
||||
}
|
||||
|
||||
/** Convert a LocatedBlock[] to a Json string. */
|
||||
public static String toJsonString(final List<LocatedBlock> array
|
||||
) throws IOException {
|
||||
if (array == null) {
|
||||
return null;
|
||||
} else if (array.size() == 0) {
|
||||
return "[]";
|
||||
} else {
|
||||
final StringBuilder b = new StringBuilder().append('[').append(
|
||||
toJsonString(array.get(0)));
|
||||
for(int i = 1; i < array.size(); i++) {
|
||||
b.append(",\n ").append(toJsonString(array.get(i)));
|
||||
}
|
||||
return b.append(']').toString();
|
||||
}
|
||||
}
|
||||
|
||||
/** Convert an Object[] to a List of LocatedBlock.
|
||||
* @throws IOException */
|
||||
public static List<LocatedBlock> toLocatedBlockList(final Object[] objects
|
||||
) throws IOException {
|
||||
if (objects == null) {
|
||||
return null;
|
||||
} else if (objects.length == 0) {
|
||||
return Collections.emptyList();
|
||||
} else {
|
||||
final List<LocatedBlock> list = new ArrayList<LocatedBlock>(objects.length);
|
||||
for(int i = 0; i < objects.length; i++) {
|
||||
list.add((LocatedBlock)toLocatedBlock((Map<?, ?>)objects[i]));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
}
|
||||
|
||||
/** Convert LocatedBlocks to a Json string. */
|
||||
public static String toJsonString(final LocatedBlocks locatedblocks
|
||||
) throws IOException {
|
||||
if (locatedblocks == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Map<String, Object> m = jsonMap.get();
|
||||
m.put("fileLength", locatedblocks.getFileLength());
|
||||
m.put("isUnderConstruction", locatedblocks.isUnderConstruction());
|
||||
|
||||
m.put("locatedBlocks", toJsonString(locatedblocks.getLocatedBlocks()));
|
||||
m.put("lastLocatedBlock", toJsonString(locatedblocks.getLastLocatedBlock()));
|
||||
m.put("isLastBlockComplete", locatedblocks.isLastBlockComplete());
|
||||
return JSON.toString(m);
|
||||
}
|
||||
|
||||
/** Convert a Json map to LocatedBlock. */
|
||||
public static LocatedBlocks toLocatedBlocks(final Map<String, Object> m
|
||||
) throws IOException {
|
||||
if (m == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final long fileLength = (Long)m.get("fileLength");
|
||||
final boolean isUnderConstruction = (Boolean)m.get("isUnderConstruction");
|
||||
final List<LocatedBlock> locatedBlocks = toLocatedBlockList(
|
||||
(Object[])JSON.parse((String) m.get("locatedBlocks")));
|
||||
final LocatedBlock lastLocatedBlock = toLocatedBlock(
|
||||
(Map<?, ?>)JSON.parse((String)m.get("lastLocatedBlock")));
|
||||
final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete");
|
||||
return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
|
||||
lastLocatedBlock, isLastBlockComplete);
|
||||
}
|
||||
}
|
@ -27,9 +27,12 @@ import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
@ -45,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
||||
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
||||
@ -54,7 +58,9 @@ import org.apache.hadoop.hdfs.web.resources.DstPathParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.GroupParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
||||
@ -63,13 +69,16 @@ import org.apache.hadoop.hdfs.web.resources.PostOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
|
||||
@ -167,7 +176,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
||||
final Param<?,?>... parameters) throws IOException {
|
||||
//initialize URI path and query
|
||||
final String path = "/" + PATH_PREFIX
|
||||
+ makeQualified(fspath).toUri().getPath();
|
||||
+ (fspath == null? "/": makeQualified(fspath).toUri().getPath());
|
||||
final String query = op.toQueryString()
|
||||
+ '&' + new UserParam(ugi)
|
||||
+ Param.toSortedString("&", parameters);
|
||||
@ -396,4 +405,41 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
||||
}
|
||||
return statuses;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<DelegationTokenIdentifier> getDelegationToken(final String renewer
|
||||
) throws IOException {
|
||||
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
|
||||
final Map<String, Object> m = run(op, null, new RenewerParam(renewer));
|
||||
final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
|
||||
token.setService(new Text(getCanonicalServiceName()));
|
||||
return token;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Token<?>> getDelegationTokens(final String renewer
|
||||
) throws IOException {
|
||||
final Token<?>[] t = {getDelegationToken(renewer)};
|
||||
return Arrays.asList(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockLocation[] getFileBlockLocations(final FileStatus status,
|
||||
final long offset, final long length) throws IOException {
|
||||
if (status == null) {
|
||||
return null;
|
||||
}
|
||||
return getFileBlockLocations(status.getPath(), offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockLocation[] getFileBlockLocations(final Path p,
|
||||
final long offset, final long length) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
|
||||
final HttpOpParam.Op op = GetOpParam.Op.GETFILEBLOCKLOCATIONS;
|
||||
final Map<String, Object> m = run(op, p, new OffsetParam(offset),
|
||||
new LengthParam(length));
|
||||
return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m));
|
||||
}
|
||||
}
|
@ -27,10 +27,13 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
|
||||
/** Get operations. */
|
||||
public static enum Op implements HttpOpParam.Op {
|
||||
OPEN(HttpURLConnection.HTTP_OK),
|
||||
GETFILEBLOCKLOCATIONS(HttpURLConnection.HTTP_OK),
|
||||
|
||||
GETFILESTATUS(HttpURLConnection.HTTP_OK),
|
||||
LISTSTATUS(HttpURLConnection.HTTP_OK),
|
||||
|
||||
GETDELEGATIONTOKEN(HttpURLConnection.HTTP_OK),
|
||||
|
||||
NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
|
||||
|
||||
final int expectedHttpResponseCode;
|
||||
|
@ -17,7 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.resources;
|
||||
|
||||
/** Recursive parameter. */
|
||||
/** Overwrite parameter. */
|
||||
public class OverwriteParam extends BooleanParam {
|
||||
/** Parameter name. */
|
||||
public static final String NAME = "overwrite";
|
||||
|
@ -0,0 +1,41 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.resources;
|
||||
|
||||
/** Renewer parameter. */
|
||||
public class RenewerParam extends StringParam {
|
||||
/** Parameter name. */
|
||||
public static final String NAME = "renewer";
|
||||
/** Default parameter value. */
|
||||
public static final String DEFAULT = NULL;
|
||||
|
||||
private static final Domain DOMAIN = new Domain(NAME, null);
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param str a string representation of the parameter value.
|
||||
*/
|
||||
public RenewerParam(final String str) {
|
||||
super(DOMAIN, str == null || str.equals(DEFAULT)? null: str);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
}
|
@ -23,12 +23,12 @@ package org.apache.hadoop.hdfs.security;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
@ -38,12 +38,16 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -56,12 +60,13 @@ public class TestDelegationToken {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
config = new HdfsConfiguration();
|
||||
config.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
||||
config.setLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, 10000);
|
||||
config.setLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 5000);
|
||||
config.set("hadoop.security.auth_to_local",
|
||||
"RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT");
|
||||
FileSystem.setDefaultUri(config, "hdfs://localhost:" + "0");
|
||||
cluster = new MiniDFSCluster.Builder(config).build();
|
||||
cluster = new MiniDFSCluster.Builder(config).numDataNodes(0).build();
|
||||
cluster.waitActive();
|
||||
dtSecretManager = NameNodeAdapter.getDtSecretManager(
|
||||
cluster.getNamesystem());
|
||||
@ -153,6 +158,31 @@ public class TestDelegationToken {
|
||||
dtSecretManager.renewToken(token, "JobTracker");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelegationTokenWebHdfsApi() throws Exception {
|
||||
((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL);
|
||||
final String uri = WebHdfsFileSystem.SCHEME + "://"
|
||||
+ config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
//get file system as JobTracker
|
||||
final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
|
||||
"JobTracker", new String[]{"user"});
|
||||
final WebHdfsFileSystem webhdfs = ugi.doAs(
|
||||
new PrivilegedExceptionAction<WebHdfsFileSystem>() {
|
||||
@Override
|
||||
public WebHdfsFileSystem run() throws Exception {
|
||||
return (WebHdfsFileSystem)FileSystem.get(new URI(uri), config);
|
||||
}
|
||||
});
|
||||
|
||||
final Token<DelegationTokenIdentifier> token = webhdfs.getDelegationToken("JobTracker");
|
||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||
byte[] tokenId = token.getIdentifier();
|
||||
identifier.readFields(new DataInputStream(new ByteArrayInputStream(tokenId)));
|
||||
LOG.info("A valid token should have non-null password, and should be renewed successfully");
|
||||
Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier));
|
||||
dtSecretManager.renewToken(token, "JobTracker");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelegationTokenWithDoAs() throws Exception {
|
||||
final DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
|
||||
|
@ -23,6 +23,7 @@ import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -114,4 +115,16 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
||||
// also okay for HDFS.
|
||||
}
|
||||
}
|
||||
|
||||
public void testGetFileBlockLocations() throws IOException {
|
||||
final String f = "/test/testGetFileBlockLocations";
|
||||
createFile(path(f));
|
||||
final BlockLocation[] computed = fs.getFileBlockLocations(new Path(f), 0L, 1L);
|
||||
final BlockLocation[] expected = cluster.getFileSystem().getFileBlockLocations(
|
||||
new Path(f), 0L, 1L);
|
||||
assertEquals(expected.length, computed.length);
|
||||
for(int i = 0; i < computed.length; i++) {
|
||||
assertEquals(expected[i].toString(), computed[i].toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user