HDFS-6281. Provide option to use the NFS Gateway without having to use the Hadoop portmapper. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1589914 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fbc0a8bef0
commit
7bad941152
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.oncrpc;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramSocket;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -45,6 +46,12 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
|
||||
private final int lowProgVersion;
|
||||
private final int highProgVersion;
|
||||
|
||||
/**
|
||||
* If not null, this will be used as the socket to use to connect to the
|
||||
* system portmap daemon when registering this RPC server program.
|
||||
*/
|
||||
private final DatagramSocket registrationSocket;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
@ -56,13 +63,15 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
|
||||
* @param highProgVersion highest version of the specification supported
|
||||
*/
|
||||
protected RpcProgram(String program, String host, int port, int progNumber,
|
||||
int lowProgVersion, int highProgVersion) {
|
||||
int lowProgVersion, int highProgVersion,
|
||||
DatagramSocket registrationSocket) {
|
||||
this.program = program;
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.progNumber = progNumber;
|
||||
this.lowProgVersion = lowProgVersion;
|
||||
this.highProgVersion = highProgVersion;
|
||||
this.registrationSocket = registrationSocket;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -105,14 +114,14 @@ public void unregister(int transport, int boundPort) {
|
||||
protected void register(PortmapMapping mapEntry, boolean set) {
|
||||
XDR mappingRequest = PortmapRequest.create(mapEntry, set);
|
||||
SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
|
||||
mappingRequest);
|
||||
mappingRequest, registrationSocket);
|
||||
try {
|
||||
registrationClient.run();
|
||||
} catch (IOException e) {
|
||||
String request = set ? "Registration" : "Unregistration";
|
||||
LOG.error(request + " failure with " + host + ":" + port
|
||||
+ ", portmap entry: " + mapEntry);
|
||||
throw new RuntimeException(request + " failure");
|
||||
+ ", portmap entry: " + mapEntry, e);
|
||||
throw new RuntimeException(request + " failure", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,43 +27,56 @@
|
||||
* A simple UDP based RPC client which just sends one request to a server.
|
||||
*/
|
||||
public class SimpleUdpClient {
|
||||
|
||||
protected final String host;
|
||||
protected final int port;
|
||||
protected final XDR request;
|
||||
protected final boolean oneShot;
|
||||
protected final DatagramSocket clientSocket;
|
||||
|
||||
public SimpleUdpClient(String host, int port, XDR request) {
|
||||
this(host, port, request, true);
|
||||
public SimpleUdpClient(String host, int port, XDR request,
|
||||
DatagramSocket clientSocket) {
|
||||
this(host, port, request, true, clientSocket);
|
||||
}
|
||||
|
||||
public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot) {
|
||||
public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot,
|
||||
DatagramSocket clientSocket) {
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.request = request;
|
||||
this.oneShot = oneShot;
|
||||
this.clientSocket = clientSocket;
|
||||
}
|
||||
|
||||
public void run() throws IOException {
|
||||
DatagramSocket clientSocket = new DatagramSocket();
|
||||
InetAddress IPAddress = InetAddress.getByName(host);
|
||||
byte[] sendData = request.getBytes();
|
||||
byte[] receiveData = new byte[65535];
|
||||
// Use the provided socket if there is one, else just make a new one.
|
||||
DatagramSocket socket = this.clientSocket == null ?
|
||||
new DatagramSocket() : this.clientSocket;
|
||||
|
||||
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
|
||||
IPAddress, port);
|
||||
clientSocket.send(sendPacket);
|
||||
DatagramPacket receivePacket = new DatagramPacket(receiveData,
|
||||
receiveData.length);
|
||||
clientSocket.receive(receivePacket);
|
||||
|
||||
// Check reply status
|
||||
XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0,
|
||||
receivePacket.getLength()));
|
||||
RpcReply reply = RpcReply.read(xdr);
|
||||
if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {
|
||||
throw new IOException("Request failed: " + reply.getState());
|
||||
try {
|
||||
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
|
||||
IPAddress, port);
|
||||
socket.send(sendPacket);
|
||||
DatagramPacket receivePacket = new DatagramPacket(receiveData,
|
||||
receiveData.length);
|
||||
socket.receive(receivePacket);
|
||||
|
||||
// Check reply status
|
||||
XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0,
|
||||
receivePacket.getLength()));
|
||||
RpcReply reply = RpcReply.read(xdr);
|
||||
if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {
|
||||
throw new IOException("Request failed: " + reply.getState());
|
||||
}
|
||||
} finally {
|
||||
// If the client socket was passed in to this UDP client, it's on the
|
||||
// caller of this UDP client to close that socket.
|
||||
if (this.clientSocket == null) {
|
||||
socket.close();
|
||||
}
|
||||
}
|
||||
|
||||
clientSocket.close();
|
||||
}
|
||||
}
|
||||
|
@ -51,7 +51,8 @@ static class TestRpcProgram extends RpcProgram {
|
||||
|
||||
protected TestRpcProgram(String program, String host, int port,
|
||||
int progNumber, int lowProgVersion, int highProgVersion) {
|
||||
super(program, host, port, progNumber, lowProgVersion, highProgVersion);
|
||||
super(program, host, port, progNumber, lowProgVersion, highProgVersion,
|
||||
null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.hdfs.nfs.mount;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramSocket;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mount.MountdBase;
|
||||
@ -31,13 +32,14 @@
|
||||
*/
|
||||
public class Mountd extends MountdBase {
|
||||
|
||||
public Mountd(Configuration config) throws IOException {
|
||||
super(new RpcProgramMountd(config));
|
||||
public Mountd(Configuration config, DatagramSocket registrationSocket)
|
||||
throws IOException {
|
||||
super(new RpcProgramMountd(config, registrationSocket));
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
Configuration config = new Configuration();
|
||||
Mountd mountd = new Mountd(config);
|
||||
Mountd mountd = new Mountd(config, null);
|
||||
mountd.start(true);
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
@ -78,10 +79,11 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
|
||||
|
||||
private final NfsExports hostsMatcher;
|
||||
|
||||
public RpcProgramMountd(Configuration config) throws IOException {
|
||||
public RpcProgramMountd(Configuration config,
|
||||
DatagramSocket registrationSocket) throws IOException {
|
||||
// Note that RPC cache is not enabled
|
||||
super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT),
|
||||
PROGRAM, VERSION_1, VERSION_3);
|
||||
PROGRAM, VERSION_1, VERSION_3, registrationSocket);
|
||||
exports = new ArrayList<String>();
|
||||
exports.add(config.get(Nfs3Constant.EXPORT_POINT,
|
||||
Nfs3Constant.EXPORT_POINT_DEFAULT));
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramSocket;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.nfs.mount.Mountd;
|
||||
@ -40,8 +41,12 @@ public class Nfs3 extends Nfs3Base {
|
||||
}
|
||||
|
||||
public Nfs3(Configuration conf) throws IOException {
|
||||
super(new RpcProgramNfs3(conf), conf);
|
||||
mountd = new Mountd(conf);
|
||||
this(conf, null);
|
||||
}
|
||||
|
||||
public Nfs3(Configuration conf, DatagramSocket registrationSocket) throws IOException {
|
||||
super(new RpcProgramNfs3(conf, registrationSocket), conf);
|
||||
mountd = new Mountd(conf, registrationSocket);
|
||||
}
|
||||
|
||||
public Mountd getMountd() {
|
||||
@ -54,9 +59,14 @@ public void startServiceInternal(boolean register) throws IOException {
|
||||
start(register);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
static void startService(String[] args,
|
||||
DatagramSocket registrationSocket) throws IOException {
|
||||
StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
|
||||
final Nfs3 nfsServer = new Nfs3(new Configuration());
|
||||
final Nfs3 nfsServer = new Nfs3(new Configuration(), registrationSocket);
|
||||
nfsServer.startServiceInternal(true);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
startService(args, null);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,76 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.commons.daemon.Daemon;
|
||||
import org.apache.commons.daemon.DaemonContext;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
|
||||
/**
|
||||
* This class is used to allow the initial registration of the NFS gateway with
|
||||
* the system portmap daemon to come from a privileged (< 1024) port. This is
|
||||
* necessary on certain operating systems to work around this bug in rpcbind:
|
||||
*
|
||||
* Red Hat: https://bugzilla.redhat.com/show_bug.cgi?id=731542
|
||||
* SLES: https://bugzilla.novell.com/show_bug.cgi?id=823364
|
||||
* Debian: https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=594880
|
||||
*/
|
||||
public class PrivilegedNfsGatewayStarter implements Daemon {
|
||||
|
||||
private String[] args = null;
|
||||
private DatagramSocket registrationSocket = null;
|
||||
|
||||
@Override
|
||||
public void init(DaemonContext context) throws Exception {
|
||||
System.err.println("Initializing privileged NFS client socket...");
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
int clientPort = conf.getInt(DFSConfigKeys.DFS_NFS_REGISTRATION_PORT_KEY,
|
||||
DFSConfigKeys.DFS_NFS_REGISTRATION_PORT_DEFAULT);
|
||||
if (clientPort < 1 || clientPort > 1023) {
|
||||
throw new RuntimeException("Must start privileged NFS server with '" +
|
||||
DFSConfigKeys.DFS_NFS_REGISTRATION_PORT_KEY + "' configured to a " +
|
||||
"privileged port.");
|
||||
}
|
||||
registrationSocket = new DatagramSocket(
|
||||
new InetSocketAddress("localhost", clientPort));
|
||||
registrationSocket.setReuseAddress(true);
|
||||
args = context.getArguments();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
Nfs3.startService(args, registrationSocket);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
if (registrationSocket != null && !registrationSocket.isClosed()) {
|
||||
registrationSocket.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -20,6 +20,7 @@
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
@ -165,10 +166,11 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||
|
||||
private final RpcCallCache rpcCallCache;
|
||||
|
||||
public RpcProgramNfs3(Configuration config) throws IOException {
|
||||
public RpcProgramNfs3(Configuration config, DatagramSocket registrationSocket)
|
||||
throws IOException {
|
||||
super("NFS3", "localhost", config.getInt(Nfs3Constant.NFS3_SERVER_PORT,
|
||||
Nfs3Constant.NFS3_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
|
||||
Nfs3Constant.VERSION, Nfs3Constant.VERSION);
|
||||
Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket);
|
||||
|
||||
config.set(FsPermission.UMASK_LABEL, "000");
|
||||
iug = new IdUserGroup();
|
||||
|
@ -260,6 +260,9 @@ Release 2.5.0 - UNRELEASED
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
HDFS-6281. Provide option to use the NFS Gateway without having to use the
|
||||
Hadoop portmapper. (atm)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-6007. Update documentation about short-circuit local reads (iwasakims
|
||||
|
@ -101,6 +101,27 @@ if [ "$COMMAND" == "datanode" ] && [ "$EUID" -eq 0 ] && [ -n "$HADOOP_SECURE_DN_
|
||||
fi
|
||||
fi
|
||||
|
||||
# Determine if we're starting a privileged NFS daemon, and if so, redefine appropriate variables
|
||||
if [ "$COMMAND" == "nfs3" ] && [ "$EUID" -eq 0 ] && [ -n "$HADOOP_PRIVILEGED_NFS_USER" ]; then
|
||||
if [ -n "$JSVC_HOME" ]; then
|
||||
if [ -n "$HADOOP_PRIVILEGED_NFS_PID_DIR" ]; then
|
||||
HADOOP_PID_DIR=$HADOOP_PRIVILEGED_NFS_PID_DIR
|
||||
fi
|
||||
|
||||
if [ -n "$HADOOP_PRIVILEGED_NFS_LOG_DIR" ]; then
|
||||
HADOOP_LOG_DIR=$HADOOP_PRIVILEGED_NFS_LOG_DIR
|
||||
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.log.dir=$HADOOP_LOG_DIR"
|
||||
fi
|
||||
|
||||
HADOOP_IDENT_STRING=$HADOOP_PRIVILEGED_NFS_USER
|
||||
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.id.str=$HADOOP_IDENT_STRING"
|
||||
starting_privileged_nfs="true"
|
||||
else
|
||||
echo "It looks like you're trying to start a privileged NFS server, but"\
|
||||
"\$JSVC_HOME isn't set. Falling back to starting unprivileged NFS server."
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ "$COMMAND" = "namenode" ] ; then
|
||||
CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode'
|
||||
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS"
|
||||
@ -178,7 +199,7 @@ if [ "$starting_secure_dn" = "true" ]; then
|
||||
|
||||
JSVC=$JSVC_HOME/jsvc
|
||||
if [ ! -f $JSVC ]; then
|
||||
echo "JSVC_HOME is not set correctly so jsvc cannot be found. Jsvc is required to run secure datanodes. "
|
||||
echo "JSVC_HOME is not set correctly so jsvc cannot be found. jsvc is required to run secure datanodes. "
|
||||
echo "Please download and install jsvc from http://archive.apache.org/dist/commons/daemon/binaries/ "\
|
||||
"and set JSVC_HOME to the directory containing the jsvc binary."
|
||||
exit
|
||||
@ -201,6 +222,38 @@ if [ "$starting_secure_dn" = "true" ]; then
|
||||
-cp "$CLASSPATH" \
|
||||
$JAVA_HEAP_MAX $HADOOP_OPTS \
|
||||
org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter "$@"
|
||||
elif [ "$starting_privileged_nfs" = "true" ] ; then
|
||||
if [ "$HADOOP_PID_DIR" = "" ]; then
|
||||
HADOOP_PRIVILEGED_NFS_PID="/tmp/hadoop_privileged_nfs3.pid"
|
||||
else
|
||||
HADOOP_PRIVILEGED_NFS_PID="$HADOOP_PID_DIR/hadoop_privileged_nfs3.pid"
|
||||
fi
|
||||
|
||||
JSVC=$JSVC_HOME/jsvc
|
||||
if [ ! -f $JSVC ]; then
|
||||
echo "JSVC_HOME is not set correctly so jsvc cannot be found. jsvc is required to run privileged NFS gateways. "
|
||||
echo "Please download and install jsvc from http://archive.apache.org/dist/commons/daemon/binaries/ "\
|
||||
"and set JSVC_HOME to the directory containing the jsvc binary."
|
||||
exit
|
||||
fi
|
||||
|
||||
if [[ ! $JSVC_OUTFILE ]]; then
|
||||
JSVC_OUTFILE="$HADOOP_LOG_DIR/nfs3_jsvc.out"
|
||||
fi
|
||||
|
||||
if [[ ! $JSVC_ERRFILE ]]; then
|
||||
JSVC_ERRFILE="$HADOOP_LOG_DIR/nfs3_jsvc.err"
|
||||
fi
|
||||
|
||||
exec "$JSVC" \
|
||||
-Dproc_$COMMAND -outfile "$JSVC_OUTFILE" \
|
||||
-errfile "$JSVC_ERRFILE" \
|
||||
-pidfile "$HADOOP_PRIVILEGED_NFS_PID" \
|
||||
-nodetach \
|
||||
-user "$HADOOP_PRIVILEGED_NFS_USER" \
|
||||
-cp "$CLASSPATH" \
|
||||
$JAVA_HEAP_MAX $HADOOP_OPTS \
|
||||
org.apache.hadoop.hdfs.nfs.nfs3.PrivilegedNfsGatewayStarter "$@"
|
||||
else
|
||||
# run it
|
||||
exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
|
||||
|
@ -628,7 +628,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
|
||||
public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE =
|
||||
"dfs.client.hedged.read.threadpool.size";
|
||||
public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0;
|
||||
public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0;
|
||||
public static final String DFS_NFS_KEYTAB_FILE_KEY = "dfs.nfs.keytab.file";
|
||||
public static final String DFS_NFS_KERBEROS_PRINCIPAL_KEY = "dfs.nfs.kerberos.principal";
|
||||
public static final String DFS_NFS_REGISTRATION_PORT_KEY = "dfs.nfs.registration.port";
|
||||
public static final int DFS_NFS_REGISTRATION_PORT_DEFAULT = 40; // Currently unassigned.
|
||||
}
|
||||
|
@ -243,6 +243,19 @@ HDFS NFS Gateway
|
||||
hadoop-daemon.sh stop portmap
|
||||
-------------------------
|
||||
|
||||
Optionally, you can forgo running the Hadoop-provided portmap daemon and
|
||||
instead use the system portmap daemon on all operating systems if you start the
|
||||
NFS Gateway as root. This will allow the HDFS NFS Gateway to work around the
|
||||
aforementioned bug and still register using the system portmap daemon. To do
|
||||
so, just start the NFS gateway daemon as you normally would, but make sure to
|
||||
do so as the "root" user, and also set the "HADOOP_PRIVILEGED_NFS_USER"
|
||||
environment variable to an unprivileged user. In this mode the NFS Gateway will
|
||||
start as root to perform its initial registration with the system portmap, and
|
||||
then will drop privileges back to the user specified by the
|
||||
HADOOP_PRIVILEGED_NFS_USER afterward and for the rest of the duration of the
|
||||
lifetime of the NFS Gateway process. Note that if you choose this route, you
|
||||
should skip steps 1 and 2 above.
|
||||
|
||||
|
||||
* {Verify validity of NFS related services}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user