From bf1649d5fd095ce027f013be57d216212fa14198 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Wed, 27 Jun 2012 02:51:39 +0000 Subject: [PATCH] HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1354316 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../blockmanagement/DatanodeManager.java | 4 + .../server/blockmanagement/Host2NodesMap.java | 12 ++ .../web/resources/NamenodeWebHdfsMethods.java | 45 ++++-- .../resources/TestWebHdfsDataLocality.java | 140 ++++++++++++++++++ .../hadoop/hdfs/web/WebHdfsTestUtil.java | 6 + 6 files changed, 198 insertions(+), 12 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ee76afad94..bf58013749 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -381,6 +381,9 @@ Branch-2 ( Unreleased changes ) HDFS-3428. Move DelegationTokenRenewer to common (tucu) + HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection. + (szetszwo) + Release 2.0.0-alpha - 05-23-2012 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index c58b857ebf..474feb5247 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1053,4 +1053,8 @@ public void clearPendingQueues() { } } + @Override + public String toString() { + return getClass().getSimpleName() + ": " + host2DatanodeMap; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java index 68ea1f1710..082816d0e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import java.util.Arrays; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -156,4 +158,14 @@ DatanodeDescriptor getDatanodeByHost(String ipAddr) { hostmapLock.readLock().unlock(); } } + + @Override + public String toString() { + final StringBuilder b = new StringBuilder(getClass().getSimpleName()) + .append("["); + for(Map.Entry e : map.entrySet()) { + b.append("\n " + e.getKey() + " => " + Arrays.asList(e.getValue())); + } + return b.append("\n]").toString(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index de8f256705..37781ea120 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -115,6 +116,11 @@ public static String getRemoteAddress() { return REMOTE_ADDRESS.get(); } + /** Set the remote client address. */ + static void setRemoteAddress(String remoteAddress) { + REMOTE_ADDRESS.set(remoteAddress); + } + private @Context ServletContext context; private @Context HttpServletRequest request; private @Context HttpServletResponse response; @@ -134,12 +140,26 @@ private void init(final UserGroupInformation ugi, response.setContentType(null); } - private static DatanodeInfo chooseDatanode(final NameNode namenode, + static DatanodeInfo chooseDatanode(final NameNode namenode, final String path, final HttpOpParam.Op op, final long openOffset, - Configuration conf) throws IOException { - if (op == GetOpParam.Op.OPEN + final long blocksize, Configuration conf) throws IOException { + final BlockManager bm = namenode.getNamesystem().getBlockManager(); + + if (op == PutOpParam.Op.CREATE) { + //choose a datanode near to client + final DatanodeDescriptor clientNode = bm.getDatanodeManager( + ).getDatanodeByHost(getRemoteAddress()); + if (clientNode != null) { + final DatanodeDescriptor[] datanodes = bm.getBlockPlacementPolicy( + ).chooseTarget(path, 1, clientNode, null, blocksize); + if (datanodes.length > 0) { + return datanodes[0]; + } + } + } else if (op == GetOpParam.Op.OPEN || op == GetOpParam.Op.GETFILECHECKSUM || op == PostOpParam.Op.APPEND) { + //choose a datanode containing a replica final NamenodeProtocols np = namenode.getRpcServer(); final HdfsFileStatus status = np.getFileInfo(path); if (status == null) { @@ -158,14 +178,13 @@ private static DatanodeInfo chooseDatanode(final NameNode namenode, final LocatedBlocks locations = np.getBlockLocations(path, offset, 1); final int count = locations.locatedBlockCount(); if (count > 0) { - return JspHelper.bestNode(locations.get(0), conf); + return JspHelper.bestNode(locations.get(0).getLocations(), false, conf); } } } - return (DatanodeDescriptor)namenode.getNamesystem().getBlockManager( - ).getDatanodeManager().getNetworkTopology().chooseRandom( - NodeBase.ROOT); + return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology( + ).chooseRandom(NodeBase.ROOT); } private Token generateDelegationToken( @@ -183,9 +202,11 @@ private URI redirectURI(final NameNode namenode, final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final String path, final HttpOpParam.Op op, final long openOffset, + final long blocksize, final Param... parameters) throws URISyntaxException, IOException { final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF); - final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset, conf); + final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset, + blocksize, conf); final String delegationQuery; if (!UserGroupInformation.isSecurityEnabled()) { @@ -356,7 +377,7 @@ private Response put( case CREATE: { final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, - fullpath, op.getValue(), -1L, + fullpath, op.getValue(), -1L, blockSize.getValue(conf), permission, overwrite, bufferSize, replication, blockSize); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } @@ -502,7 +523,7 @@ private Response post( case APPEND: { final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, - fullpath, op.getValue(), -1L, bufferSize); + fullpath, op.getValue(), -1L, -1L, bufferSize); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } default: @@ -598,7 +619,7 @@ private Response get( case OPEN: { final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, - fullpath, op.getValue(), offset.getValue(), offset, length, bufferSize); + fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } case GET_BLOCK_LOCATIONS: @@ -634,7 +655,7 @@ private Response get( case GETFILECHECKSUM: { final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, - fullpath, op.getValue(), -1L); + fullpath, op.getValue(), -1L, -1L); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } case GETDELEGATIONTOKEN: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java new file mode 100644 index 0000000000..74373be6e5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.web.resources; + +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; +import org.apache.hadoop.hdfs.web.resources.GetOpParam; +import org.apache.hadoop.hdfs.web.resources.PostOpParam; +import org.apache.hadoop.hdfs.web.resources.PutOpParam; +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test WebHDFS which provides data locality using HTTP redirection. + */ +public class TestWebHdfsDataLocality { + static final Log LOG = LogFactory.getLog(TestWebHdfsDataLocality.class); + { + ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.OFF); + ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.OFF); + ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.OFF); + } + + private static final String RACK0 = "/rack0"; + private static final String RACK1 = "/rack1"; + private static final String RACK2 = "/rack2"; + + @Test + public void testDataLocality() throws Exception { + final Configuration conf = WebHdfsTestUtil.createConf(); + final String[] racks = {RACK0, RACK0, RACK1, RACK1, RACK2, RACK2}; + final int nDataNodes = racks.length; + LOG.info("nDataNodes=" + nDataNodes + ", racks=" + Arrays.asList(racks)); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(nDataNodes) + .racks(racks) + .build(); + try { + cluster.waitActive(); + + final DistributedFileSystem dfs = cluster.getFileSystem(); + final NameNode namenode = cluster.getNameNode(); + final DatanodeManager dm = namenode.getNamesystem().getBlockManager( + ).getDatanodeManager(); + LOG.info("dm=" + dm); + + final long blocksize = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; + final String f = "/foo"; + + { //test CREATE + for(int i = 0; i < nDataNodes; i++) { + //set client address to a particular datanode + final DataNode dn = cluster.getDataNodes().get(i); + final String ipAddr = dm.getDatanode(dn.getDatanodeId()).getIpAddr(); + NamenodeWebHdfsMethods.setRemoteAddress(ipAddr); + + //The chosen datanode must be the same as the client address + final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( + namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, conf); + Assert.assertEquals(ipAddr, chosen.getIpAddr()); + } + } + + //create a file with one replica. + final Path p = new Path(f); + final FSDataOutputStream out = dfs.create(p, (short)1); + out.write(1); + out.close(); + + //get replica location. + final LocatedBlocks locatedblocks = NameNodeAdapter.getBlockLocations( + namenode, f, 0, 1); + final List lb = locatedblocks.getLocatedBlocks(); + Assert.assertEquals(1, lb.size()); + final DatanodeInfo[] locations = lb.get(0).getLocations(); + Assert.assertEquals(1, locations.length); + final DatanodeInfo expected = locations[0]; + + //For GETFILECHECKSUM, OPEN and APPEND, + //the chosen datanode must be the same as the replica location. + + { //test GETFILECHECKSUM + final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( + namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, conf); + Assert.assertEquals(expected, chosen); + } + + { //test OPEN + final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( + namenode, f, GetOpParam.Op.OPEN, 0, blocksize, conf); + Assert.assertEquals(expected, chosen); + } + + { //test APPEND + final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( + namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, conf); + Assert.assertEquals(expected, chosen); + } + } finally { + cluster.shutdown(); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java index 38e2168519..9ae0fb28c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java @@ -40,6 +40,12 @@ public class WebHdfsTestUtil { public static final Log LOG = LogFactory.getLog(WebHdfsTestUtil.class); + public static Configuration createConf() { + final Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true); + return conf; + } + public static WebHdfsFileSystem getWebHdfsFileSystem(final Configuration conf ) throws IOException, URISyntaxException { final String uri = WebHdfsFileSystem.SCHEME + "://"