HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1354316 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
03f2f9b580
commit
bf1649d5fd
@ -381,6 +381,9 @@ Branch-2 ( Unreleased changes )
|
||||
|
||||
HDFS-3428. Move DelegationTokenRenewer to common (tucu)
|
||||
|
||||
HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection.
|
||||
(szetszwo)
|
||||
|
||||
Release 2.0.0-alpha - 05-23-2012
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -1053,4 +1053,8 @@ public void clearPendingQueues() {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + ": " + host2DatanodeMap;
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,9 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
@ -156,4 +158,14 @@ DatanodeDescriptor getDatanodeByHost(String ipAddr) {
|
||||
hostmapLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
|
||||
.append("[");
|
||||
for(Map.Entry<String, DatanodeDescriptor[]> e : map.entrySet()) {
|
||||
b.append("\n " + e.getKey() + " => " + Arrays.asList(e.getValue()));
|
||||
}
|
||||
return b.append("\n]").toString();
|
||||
}
|
||||
}
|
||||
|
@ -56,6 +56,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
@ -115,6 +116,11 @@ public static String getRemoteAddress() {
|
||||
return REMOTE_ADDRESS.get();
|
||||
}
|
||||
|
||||
/** Set the remote client address. */
|
||||
static void setRemoteAddress(String remoteAddress) {
|
||||
REMOTE_ADDRESS.set(remoteAddress);
|
||||
}
|
||||
|
||||
private @Context ServletContext context;
|
||||
private @Context HttpServletRequest request;
|
||||
private @Context HttpServletResponse response;
|
||||
@ -134,12 +140,26 @@ private void init(final UserGroupInformation ugi,
|
||||
response.setContentType(null);
|
||||
}
|
||||
|
||||
private static DatanodeInfo chooseDatanode(final NameNode namenode,
|
||||
static DatanodeInfo chooseDatanode(final NameNode namenode,
|
||||
final String path, final HttpOpParam.Op op, final long openOffset,
|
||||
Configuration conf) throws IOException {
|
||||
if (op == GetOpParam.Op.OPEN
|
||||
final long blocksize, Configuration conf) throws IOException {
|
||||
final BlockManager bm = namenode.getNamesystem().getBlockManager();
|
||||
|
||||
if (op == PutOpParam.Op.CREATE) {
|
||||
//choose a datanode near to client
|
||||
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
|
||||
).getDatanodeByHost(getRemoteAddress());
|
||||
if (clientNode != null) {
|
||||
final DatanodeDescriptor[] datanodes = bm.getBlockPlacementPolicy(
|
||||
).chooseTarget(path, 1, clientNode, null, blocksize);
|
||||
if (datanodes.length > 0) {
|
||||
return datanodes[0];
|
||||
}
|
||||
}
|
||||
} else if (op == GetOpParam.Op.OPEN
|
||||
|| op == GetOpParam.Op.GETFILECHECKSUM
|
||||
|| op == PostOpParam.Op.APPEND) {
|
||||
//choose a datanode containing a replica
|
||||
final NamenodeProtocols np = namenode.getRpcServer();
|
||||
final HdfsFileStatus status = np.getFileInfo(path);
|
||||
if (status == null) {
|
||||
@ -158,14 +178,13 @@ private static DatanodeInfo chooseDatanode(final NameNode namenode,
|
||||
final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
|
||||
final int count = locations.locatedBlockCount();
|
||||
if (count > 0) {
|
||||
return JspHelper.bestNode(locations.get(0), conf);
|
||||
return JspHelper.bestNode(locations.get(0).getLocations(), false, conf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return (DatanodeDescriptor)namenode.getNamesystem().getBlockManager(
|
||||
).getDatanodeManager().getNetworkTopology().chooseRandom(
|
||||
NodeBase.ROOT);
|
||||
return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology(
|
||||
).chooseRandom(NodeBase.ROOT);
|
||||
}
|
||||
|
||||
private Token<? extends TokenIdentifier> generateDelegationToken(
|
||||
@ -183,9 +202,11 @@ private URI redirectURI(final NameNode namenode,
|
||||
final UserGroupInformation ugi, final DelegationParam delegation,
|
||||
final UserParam username, final DoAsParam doAsUser,
|
||||
final String path, final HttpOpParam.Op op, final long openOffset,
|
||||
final long blocksize,
|
||||
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
|
||||
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
|
||||
final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset, conf);
|
||||
final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset,
|
||||
blocksize, conf);
|
||||
|
||||
final String delegationQuery;
|
||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
||||
@ -356,7 +377,7 @@ private Response put(
|
||||
case CREATE:
|
||||
{
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
|
||||
fullpath, op.getValue(), -1L,
|
||||
fullpath, op.getValue(), -1L, blockSize.getValue(conf),
|
||||
permission, overwrite, bufferSize, replication, blockSize);
|
||||
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
@ -502,7 +523,7 @@ private Response post(
|
||||
case APPEND:
|
||||
{
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
|
||||
fullpath, op.getValue(), -1L, bufferSize);
|
||||
fullpath, op.getValue(), -1L, -1L, bufferSize);
|
||||
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
default:
|
||||
@ -598,7 +619,7 @@ private Response get(
|
||||
case OPEN:
|
||||
{
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
|
||||
fullpath, op.getValue(), offset.getValue(), offset, length, bufferSize);
|
||||
fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
|
||||
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case GET_BLOCK_LOCATIONS:
|
||||
@ -634,7 +655,7 @@ private Response get(
|
||||
case GETFILECHECKSUM:
|
||||
{
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
|
||||
fullpath, op.getValue(), -1L);
|
||||
fullpath, op.getValue(), -1L, -1L);
|
||||
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case GETDELEGATIONTOKEN:
|
||||
|
@ -0,0 +1,140 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.web.resources;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test WebHDFS which provides data locality using HTTP redirection.
|
||||
*/
|
||||
public class TestWebHdfsDataLocality {
|
||||
static final Log LOG = LogFactory.getLog(TestWebHdfsDataLocality.class);
|
||||
{
|
||||
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.OFF);
|
||||
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.OFF);
|
||||
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.OFF);
|
||||
}
|
||||
|
||||
private static final String RACK0 = "/rack0";
|
||||
private static final String RACK1 = "/rack1";
|
||||
private static final String RACK2 = "/rack2";
|
||||
|
||||
@Test
|
||||
public void testDataLocality() throws Exception {
|
||||
final Configuration conf = WebHdfsTestUtil.createConf();
|
||||
final String[] racks = {RACK0, RACK0, RACK1, RACK1, RACK2, RACK2};
|
||||
final int nDataNodes = racks.length;
|
||||
LOG.info("nDataNodes=" + nDataNodes + ", racks=" + Arrays.asList(racks));
|
||||
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(nDataNodes)
|
||||
.racks(racks)
|
||||
.build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final NameNode namenode = cluster.getNameNode();
|
||||
final DatanodeManager dm = namenode.getNamesystem().getBlockManager(
|
||||
).getDatanodeManager();
|
||||
LOG.info("dm=" + dm);
|
||||
|
||||
final long blocksize = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
||||
final String f = "/foo";
|
||||
|
||||
{ //test CREATE
|
||||
for(int i = 0; i < nDataNodes; i++) {
|
||||
//set client address to a particular datanode
|
||||
final DataNode dn = cluster.getDataNodes().get(i);
|
||||
final String ipAddr = dm.getDatanode(dn.getDatanodeId()).getIpAddr();
|
||||
NamenodeWebHdfsMethods.setRemoteAddress(ipAddr);
|
||||
|
||||
//The chosen datanode must be the same as the client address
|
||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||
namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, conf);
|
||||
Assert.assertEquals(ipAddr, chosen.getIpAddr());
|
||||
}
|
||||
}
|
||||
|
||||
//create a file with one replica.
|
||||
final Path p = new Path(f);
|
||||
final FSDataOutputStream out = dfs.create(p, (short)1);
|
||||
out.write(1);
|
||||
out.close();
|
||||
|
||||
//get replica location.
|
||||
final LocatedBlocks locatedblocks = NameNodeAdapter.getBlockLocations(
|
||||
namenode, f, 0, 1);
|
||||
final List<LocatedBlock> lb = locatedblocks.getLocatedBlocks();
|
||||
Assert.assertEquals(1, lb.size());
|
||||
final DatanodeInfo[] locations = lb.get(0).getLocations();
|
||||
Assert.assertEquals(1, locations.length);
|
||||
final DatanodeInfo expected = locations[0];
|
||||
|
||||
//For GETFILECHECKSUM, OPEN and APPEND,
|
||||
//the chosen datanode must be the same as the replica location.
|
||||
|
||||
{ //test GETFILECHECKSUM
|
||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, conf);
|
||||
Assert.assertEquals(expected, chosen);
|
||||
}
|
||||
|
||||
{ //test OPEN
|
||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, conf);
|
||||
Assert.assertEquals(expected, chosen);
|
||||
}
|
||||
|
||||
{ //test APPEND
|
||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||
namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, conf);
|
||||
Assert.assertEquals(expected, chosen);
|
||||
}
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
@ -40,6 +40,12 @@
|
||||
public class WebHdfsTestUtil {
|
||||
public static final Log LOG = LogFactory.getLog(WebHdfsTestUtil.class);
|
||||
|
||||
public static Configuration createConf() {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
||||
return conf;
|
||||
}
|
||||
|
||||
public static WebHdfsFileSystem getWebHdfsFileSystem(final Configuration conf
|
||||
) throws IOException, URISyntaxException {
|
||||
final String uri = WebHdfsFileSystem.SCHEME + "://"
|
||||
|
Loading…
Reference in New Issue
Block a user