HDFS-2545. Change WebHDFS to support multiple namenodes in federation.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1214027 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-12-14 01:47:57 +00:00
parent a839f875e8
commit 433a6e78f6
6 changed files with 349 additions and 15 deletions

View File

@ -175,6 +175,9 @@ Release 0.23.1 - UNRELEASED
HDFS-2594. Support getDelegationTokens and createSymlink in WebHDFS.
(szetszwo)
HDFS-2545. Change WebHDFS to support multiple namenodes in federation.
(szetszwo)
IMPROVEMENTS
HDFS-2560. Refactor BPOfferService to be a static inner class (todd)

View File

@ -63,6 +63,7 @@
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.Param;
@ -89,7 +90,8 @@ public class DatanodeWebHdfsMethods {
private @Context ServletContext context;
private @Context HttpServletResponse response;
private void init(final UserGroupInformation ugi, final DelegationParam delegation,
private void init(final UserGroupInformation ugi,
final DelegationParam delegation, final InetSocketAddress nnRpcAddr,
final UriFsPathParam path, final HttpOpParam<?> op,
final Param<?, ?>... parameters) throws IOException {
if (LOG.isTraceEnabled()) {
@ -102,9 +104,8 @@ private void init(final UserGroupInformation ugi, final DelegationParam delegati
if (UserGroupInformation.isSecurityEnabled()) {
//add a token for RPC.
final DataNode datanode = (DataNode)context.getAttribute("datanode");
final InetSocketAddress nnRpcAddr = NameNode.getAddress(datanode.getConf());
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
final Token<DelegationTokenIdentifier> token =
new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegation.getValue());
SecurityUtil.setTokenService(token, nnRpcAddr);
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
@ -122,6 +123,9 @@ public Response putRoot(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op,
@QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
@ -135,8 +139,8 @@ public Response putRoot(
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
final BlockSizeParam blockSize
) throws IOException, InterruptedException {
return put(in, ugi, delegation, ROOT, op, permission, overwrite, bufferSize,
replication, blockSize);
return put(in, ugi, delegation, namenodeRpcAddress, ROOT, op, permission,
overwrite, bufferSize, replication, blockSize);
}
/** Handle HTTP PUT request. */
@ -149,6 +153,9 @@ public Response put(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op,
@ -164,8 +171,9 @@ public Response put(
final BlockSizeParam blockSize
) throws IOException, InterruptedException {
init(ugi, delegation, path, op, permission, overwrite, bufferSize,
replication, blockSize);
final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
init(ugi, delegation, nnRpcAddr, path, op, permission,
overwrite, bufferSize, replication, blockSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@ -178,7 +186,6 @@ public Response run() throws IOException, URISyntaxException {
case CREATE:
{
final Configuration conf = new Configuration(datanode.getConf());
final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
conf.set(FsPermission.UMASK_LABEL, "000");
final int b = bufferSize.getValue(conf);
@ -221,12 +228,15 @@ public Response postRoot(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
return post(in, ugi, delegation, ROOT, op, bufferSize);
return post(in, ugi, delegation, namenodeRpcAddress, ROOT, op, bufferSize);
}
/** Handle HTTP POST request. */
@ -239,6 +249,9 @@ public Response post(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op,
@ -246,7 +259,8 @@ public Response post(
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
init(ugi, delegation, path, op, bufferSize);
final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
init(ugi, delegation, nnRpcAddr, path, op, bufferSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@ -259,7 +273,6 @@ public Response run() throws IOException {
case APPEND:
{
final Configuration conf = new Configuration(datanode.getConf());
final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
final int b = bufferSize.getValue(conf);
DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
FSDataOutputStream out = null;
@ -291,6 +304,9 @@ public Response getRoot(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op,
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
@ -300,7 +316,8 @@ public Response getRoot(
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
return get(ugi, delegation, ROOT, op, offset, length, bufferSize);
return get(ugi, delegation, namenodeRpcAddress, ROOT, op, offset, length,
bufferSize);
}
/** Handle HTTP GET request. */
@ -311,6 +328,9 @@ public Response get(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op,
@ -322,7 +342,8 @@ public Response get(
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
init(ugi, delegation, path, op, offset, length, bufferSize);
final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
init(ugi, delegation, nnRpcAddr, path, op, offset, length, bufferSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@ -331,7 +352,6 @@ public Response run() throws IOException {
final String fullpath = path.getAbsolutePath();
final DataNode datanode = (DataNode)context.getAttribute("datanode");
final Configuration conf = new Configuration(datanode.getConf());
final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
switch(op.getValue()) {
case OPEN:

View File

@ -76,6 +76,7 @@
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
@ -198,6 +199,7 @@ private URI redirectURI(final NameNode namenode,
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
}
final String query = op.toQueryString() + delegationQuery
+ "&" + new NamenodeRpcAddressParam(namenode)
+ Param.toSortedString("&", parameters);
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web.resources;
import java.net.InetSocketAddress;
/** InetSocketAddressParam parameter. */
abstract class InetSocketAddressParam
extends Param<InetSocketAddress, InetSocketAddressParam.Domain> {
InetSocketAddressParam(final Domain domain, final InetSocketAddress value) {
super(domain, value);
}
@Override
public String toString() {
return getName() + "=" + Domain.toString(getValue());
}
/** The domain of the parameter. */
static final class Domain extends Param.Domain<InetSocketAddress> {
Domain(final String paramName) {
super(paramName);
}
@Override
public String getDomain() {
return "<HOST:PORT>";
}
@Override
InetSocketAddress parse(final String str) {
final int i = str.indexOf(':');
if (i < 0) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\" as " + getDomain() + ": the ':' character not found.");
} else if (i == 0) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\" as " + getDomain() + ": HOST is empty.");
} else if (i == str.length() - 1) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\" as " + getDomain() + ": PORT is empty.");
}
final String host = str.substring(0, i);
final int port;
try {
port = Integer.parseInt(str.substring(i + 1));
} catch(NumberFormatException e) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\" as " + getDomain() + ": the ':' position is " + i
+ " but failed to parse PORT.", e);
}
try {
return new InetSocketAddress(host, port);
} catch(Exception e) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\": cannot create InetSocketAddress(host=" + host
+ ", port=" + port + ")", e);
}
}
/** Convert an InetSocketAddress to a HOST:PORT String. */
static String toString(final InetSocketAddress addr) {
return addr.getHostName() + ":" + addr.getPort();
}
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web.resources;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
/** Namenode RPC address parameter. */
public class NamenodeRpcAddressParam extends InetSocketAddressParam {
/** Parameter name. */
public static final String NAME = "namenoderpcaddress";
/** Default parameter value. */
public static final String DEFAULT = "";
private static final Domain DOMAIN = new Domain(NAME);
/**
* Constructor.
* @param str a string representation of the parameter value.
*/
public NamenodeRpcAddressParam(final String str) {
super(DOMAIN, str == null || str.equals(DEFAULT)? null: DOMAIN.parse(str));
}
/**
* Construct an object using the RPC address of the given namenode.
*/
public NamenodeRpcAddressParam(final NameNode namenode) {
super(DOMAIN, namenode.getNameNodeAddress());
}
@Override
public String getName() {
return NAME;
}
}

View File

@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import java.net.InetSocketAddress;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test WebHDFS with multiple NameNodes
*/
public class TestWebHdfsWithMultipleNameNodes {
static final Log LOG = WebHdfsTestUtil.LOG;
static private void setLogLevel() {
((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.OFF);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.OFF);
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.OFF);
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF);
}
private static final Configuration conf = new HdfsConfiguration();
private static MiniDFSCluster cluster;
private static WebHdfsFileSystem[] webhdfs;
@BeforeClass
public static void setupTest() {
setLogLevel();
try {
setupCluster(4, 3);
} catch(Exception e) {
throw new RuntimeException(e);
}
}
private static void setupCluster(final int nNameNodes, final int nDataNodes)
throws Exception {
LOG.info("nNameNodes=" + nNameNodes + ", nDataNodes=" + nDataNodes);
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
cluster = new MiniDFSCluster.Builder(conf)
.numNameNodes(nNameNodes)
.numDataNodes(nDataNodes)
.build();
cluster.waitActive();
webhdfs = new WebHdfsFileSystem[nNameNodes];
for(int i = 0; i < webhdfs.length; i++) {
final InetSocketAddress addr = cluster.getNameNode(i).getHttpAddress();
final String uri = WebHdfsFileSystem.SCHEME + "://"
+ addr.getHostName() + ":" + addr.getPort() + "/";
webhdfs[i] = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf);
}
}
@AfterClass
public static void shutdownCluster() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
private static String createString(String prefix, int i) {
//The suffix is to make sure the strings have different lengths.
final String suffix = "*********************".substring(0, i+1);
return prefix + i + suffix + "\n";
}
private static String[] createStrings(String prefix, String name) {
final String[] strings = new String[webhdfs.length];
for(int i = 0; i < webhdfs.length; i++) {
strings[i] = createString(prefix, i);
LOG.info(name + "[" + i + "] = " + strings[i]);
}
return strings;
}
@Test
public void testRedirect() throws Exception {
final String dir = "/testRedirect/";
final String filename = "file";
final Path p = new Path(dir, filename);
final String[] writeStrings = createStrings("write to webhdfs ", "write");
final String[] appendStrings = createStrings("append to webhdfs ", "append");
//test create: create a file for each namenode
for(int i = 0; i < webhdfs.length; i++) {
final FSDataOutputStream out = webhdfs[i].create(p);
out.write(writeStrings[i].getBytes());
out.close();
}
for(int i = 0; i < webhdfs.length; i++) {
//check file length
final long expected = writeStrings[i].length();
Assert.assertEquals(expected, webhdfs[i].getFileStatus(p).getLen());
}
//test read: check file content for each namenode
for(int i = 0; i < webhdfs.length; i++) {
final FSDataInputStream in = webhdfs[i].open(p);
for(int c, j = 0; (c = in.read()) != -1; j++) {
Assert.assertEquals(writeStrings[i].charAt(j), c);
}
in.close();
}
//test append: append to the file for each namenode
for(int i = 0; i < webhdfs.length; i++) {
final FSDataOutputStream out = webhdfs[i].append(p);
out.write(appendStrings[i].getBytes());
out.close();
}
for(int i = 0; i < webhdfs.length; i++) {
//check file length
final long expected = writeStrings[i].length() + appendStrings[i].length();
Assert.assertEquals(expected, webhdfs[i].getFileStatus(p).getLen());
}
//test read: check file content for each namenode
for(int i = 0; i < webhdfs.length; i++) {
final StringBuilder b = new StringBuilder();
final FSDataInputStream in = webhdfs[i].open(p);
for(int c; (c = in.read()) != -1; ) {
b.append((char)c);
}
final int wlen = writeStrings[i].length();
Assert.assertEquals(writeStrings[i], b.substring(0, wlen));
Assert.assertEquals(appendStrings[i], b.substring(wlen));
in.close();
}
}
}