diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java index 36b0095cb0..d828f8899b 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java @@ -19,11 +19,14 @@ package org.apache.hadoop.oncrpc; import java.io.IOException; import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState; import org.apache.hadoop.oncrpc.security.Verifier; +import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.portmap.PortmapMapping; import org.apache.hadoop.portmap.PortmapRequest; import org.jboss.netty.buffer.ChannelBuffer; @@ -37,7 +40,7 @@ import org.jboss.netty.channel.SimpleChannelUpstreamHandler; * and implement {@link #handleInternal} to handle the requests received. */ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { - private static final Log LOG = LogFactory.getLog(RpcProgram.class); + static final Log LOG = LogFactory.getLog(RpcProgram.class); public static final int RPCB_PORT = 111; private final String program; private final String host; @@ -45,6 +48,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { private final int progNumber; private final int lowProgVersion; private final int highProgVersion; + private final boolean allowInsecurePorts; /** * If not null, this will be used as the socket to use to connect to the @@ -61,10 +65,14 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { * @param progNumber program number as defined in RFC 1050 * @param lowProgVersion lowest version of the specification supported * @param highProgVersion highest version of the specification supported + * @param DatagramSocket registrationSocket if not null, use this socket to + * register with portmap daemon + * @param allowInsecurePorts true to allow client connections from + * unprivileged ports, false otherwise */ protected RpcProgram(String program, String host, int port, int progNumber, int lowProgVersion, int highProgVersion, - DatagramSocket registrationSocket) { + DatagramSocket registrationSocket, boolean allowInsecurePorts) { this.program = program; this.host = host; this.port = port; @@ -72,6 +80,9 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { this.lowProgVersion = lowProgVersion; this.highProgVersion = highProgVersion; this.registrationSocket = registrationSocket; + this.allowInsecurePorts = allowInsecurePorts; + LOG.info("Will " + (allowInsecurePorts ? "" : "not ") + "accept client " + + "connections from unprivileged ports"); } /** @@ -133,43 +144,82 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { throws Exception { RpcInfo info = (RpcInfo) e.getMessage(); RpcCall call = (RpcCall) info.header(); + + SocketAddress remoteAddress = info.remoteAddress(); + if (!allowInsecurePorts) { + if (LOG.isDebugEnabled()) { + LOG.debug("Will not allow connections from unprivileged ports. " + + "Checking for valid client port..."); + } + if (remoteAddress instanceof InetSocketAddress) { + InetSocketAddress inetRemoteAddress = (InetSocketAddress) remoteAddress; + if (inetRemoteAddress.getPort() > 1023) { + LOG.warn("Connection attempted from '" + inetRemoteAddress + "' " + + "which is an unprivileged port. Rejecting connection."); + sendRejectedReply(call, remoteAddress, ctx); + return; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Accepting connection from '" + remoteAddress + "'"); + } + } + } else { + LOG.warn("Could not determine remote port of socket address '" + + remoteAddress + "'. Rejecting connection."); + sendRejectedReply(call, remoteAddress, ctx); + return; + } + } + if (LOG.isTraceEnabled()) { LOG.trace(program + " procedure #" + call.getProcedure()); } if (this.progNumber != call.getProgram()) { LOG.warn("Invalid RPC call program " + call.getProgram()); - RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(), - AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE); - - XDR out = new XDR(); - reply.write(out); - ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() - .buffer()); - RpcResponse rsp = new RpcResponse(b, info.remoteAddress()); - RpcUtil.sendRpcResponse(ctx, rsp); + sendAcceptedReply(call, remoteAddress, AcceptState.PROG_UNAVAIL, ctx); return; } int ver = call.getVersion(); if (ver < lowProgVersion || ver > highProgVersion) { LOG.warn("Invalid RPC call version " + ver); - RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(), - AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE); - - XDR out = new XDR(); - reply.write(out); - out.writeInt(lowProgVersion); - out.writeInt(highProgVersion); - ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() - .buffer()); - RpcResponse rsp = new RpcResponse(b, info.remoteAddress()); - RpcUtil.sendRpcResponse(ctx, rsp); + sendAcceptedReply(call, remoteAddress, AcceptState.PROG_MISMATCH, ctx); return; } handleInternal(ctx, info); } + + private void sendAcceptedReply(RpcCall call, SocketAddress remoteAddress, + AcceptState acceptState, ChannelHandlerContext ctx) { + RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(), + acceptState, Verifier.VERIFIER_NONE); + + XDR out = new XDR(); + reply.write(out); + if (acceptState == AcceptState.PROG_MISMATCH) { + out.writeInt(lowProgVersion); + out.writeInt(highProgVersion); + } + ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + .buffer()); + RpcResponse rsp = new RpcResponse(b, remoteAddress); + RpcUtil.sendRpcResponse(ctx, rsp); + } + + private static void sendRejectedReply(RpcCall call, + SocketAddress remoteAddress, ChannelHandlerContext ctx) { + XDR out = new XDR(); + RpcDeniedReply reply = new RpcDeniedReply(call.getXid(), + RpcReply.ReplyState.MSG_DENIED, + RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone()); + reply.write(out); + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + .buffer()); + RpcResponse rsp = new RpcResponse(buf, remoteAddress); + RpcUtil.sendRpcResponse(ctx, rsp); + } protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo info); diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java index b9d8b1da8c..9c649bc744 100644 --- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java @@ -28,6 +28,8 @@ import java.util.Random; import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder; import org.apache.hadoop.oncrpc.security.CredentialsNone; import org.apache.hadoop.oncrpc.security.VerifierNone; +import org.apache.log4j.Level; +import org.apache.commons.logging.impl.Log4JLogger; import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; @@ -38,10 +40,16 @@ import org.junit.Test; import org.mockito.Mockito; public class TestFrameDecoder { + + static { + ((Log4JLogger) RpcProgram.LOG).getLogger().setLevel(Level.ALL); + } private static int resultSize; static void testRequest(XDR request, int serverPort) { + // Reset resultSize so as to avoid interference from other tests in this class. + resultSize = 0; SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", serverPort, request, true); tcpClient.run(); @@ -50,9 +58,10 @@ public class TestFrameDecoder { static class TestRpcProgram extends RpcProgram { protected TestRpcProgram(String program, String host, int port, - int progNumber, int lowProgVersion, int highProgVersion) { + int progNumber, int lowProgVersion, int highProgVersion, + boolean allowInsecurePorts) { super(program, host, port, progNumber, lowProgVersion, highProgVersion, - null); + null, allowInsecurePorts); } @Override @@ -149,26 +158,7 @@ public class TestFrameDecoder { @Test public void testFrames() { - - Random rand = new Random(); - int serverPort = 30000 + rand.nextInt(10000); - int retries = 10; // A few retries in case initial choice is in use. - - while (true) { - try { - RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram", - "localhost", serverPort, 100000, 1, 2); - SimpleTcpServer tcpServer = new SimpleTcpServer(serverPort, program, 1); - tcpServer.run(); - break; // Successfully bound a port, break out. - } catch (ChannelException ce) { - if (retries-- > 0) { - serverPort += rand.nextInt(20); // Port in use? Try another. - } else { - throw ce; // Out of retries. - } - } - } + int serverPort = startRpcServer(true); XDR xdrOut = createGetportMount(); int headerSize = xdrOut.size(); @@ -183,6 +173,47 @@ public class TestFrameDecoder { // Verify the server got the request with right size assertEquals(requestSize, resultSize); } + + @Test + public void testUnprivilegedPort() { + // Don't allow connections from unprivileged ports. Given that this test is + // presumably not being run by root, this will be the case. + int serverPort = startRpcServer(false); + + XDR xdrOut = createGetportMount(); + int bufsize = 2 * 1024 * 1024; + byte[] buffer = new byte[bufsize]; + xdrOut.writeFixedOpaque(buffer); + + // Send the request to the server + testRequest(xdrOut, serverPort); + + // Verify the server rejected the request. + assertEquals(0, resultSize); + } + + private static int startRpcServer(boolean allowInsecurePorts) { + Random rand = new Random(); + int serverPort = 30000 + rand.nextInt(10000); + int retries = 10; // A few retries in case initial choice is in use. + + while (true) { + try { + RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram", + "localhost", serverPort, 100000, 1, 2, allowInsecurePorts); + SimpleTcpServer tcpServer = new SimpleTcpServer(serverPort, program, 1); + tcpServer.run(); + break; // Successfully bound a port, break out. + } catch (ChannelException ce) { + if (retries-- > 0) { + serverPort += rand.nextInt(20); // Port in use? Try another. + } else { + throw ce; // Out of retries. + } + } + } + return serverPort; + } static void createPortmapXDRheader(XDR xdr_out, int procedure) { // Make this a method diff --git a/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties b/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties new file mode 100644 index 0000000000..1a6baaec65 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties @@ -0,0 +1,18 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshhold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java index 6310a8bc1a..84349bfe32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java @@ -32,14 +32,14 @@ import org.apache.hadoop.mount.MountdBase; */ public class Mountd extends MountdBase { - public Mountd(Configuration config, DatagramSocket registrationSocket) - throws IOException { - super(new RpcProgramMountd(config, registrationSocket)); + public Mountd(Configuration config, DatagramSocket registrationSocket, + boolean allowInsecurePorts) throws IOException { + super(new RpcProgramMountd(config, registrationSocket, allowInsecurePorts)); } public static void main(String[] args) throws IOException { Configuration config = new Configuration(); - Mountd mountd = new Mountd(config, null); + Mountd mountd = new Mountd(config, null, true); mountd.start(true); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java index 9398e7906b..29b227aecb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java @@ -79,11 +79,11 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { private final NfsExports hostsMatcher; - public RpcProgramMountd(Configuration config, - DatagramSocket registrationSocket) throws IOException { + public RpcProgramMountd(Configuration config, DatagramSocket registrationSocket, + boolean allowInsecurePorts) throws IOException { // Note that RPC cache is not enabled super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT), - PROGRAM, VERSION_1, VERSION_3, registrationSocket); + PROGRAM, VERSION_1, VERSION_3, registrationSocket, allowInsecurePorts); exports = new ArrayList(); exports.add(config.get(Nfs3Constant.EXPORT_POINT, Nfs3Constant.EXPORT_POINT_DEFAULT)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java index 0fc9f06ae1..5cad60728d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.DatagramSocket; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.nfs.mount.Mountd; import org.apache.hadoop.nfs.nfs3.Nfs3Base; import org.apache.hadoop.util.StringUtils; @@ -41,12 +42,13 @@ public class Nfs3 extends Nfs3Base { } public Nfs3(Configuration conf) throws IOException { - this(conf, null); + this(conf, null, true); } - public Nfs3(Configuration conf, DatagramSocket registrationSocket) throws IOException { - super(new RpcProgramNfs3(conf, registrationSocket), conf); - mountd = new Mountd(conf, registrationSocket); + public Nfs3(Configuration conf, DatagramSocket registrationSocket, + boolean allowInsecurePorts) throws IOException { + super(new RpcProgramNfs3(conf, registrationSocket, allowInsecurePorts), conf); + mountd = new Mountd(conf, registrationSocket, allowInsecurePorts); } public Mountd getMountd() { @@ -61,8 +63,13 @@ public class Nfs3 extends Nfs3Base { static void startService(String[] args, DatagramSocket registrationSocket) throws IOException { - StringUtils.startupShutdownMessage(Nfs3.class, args, LOG); - final Nfs3 nfsServer = new Nfs3(new Configuration(), registrationSocket); + StringUtils.startupShutdownMessage(Nfs3.class, args, LOG); + Configuration conf = new Configuration(); + boolean allowInsecurePorts = conf.getBoolean( + DFSConfigKeys.DFS_NFS_ALLOW_INSECURE_PORTS_KEY, + DFSConfigKeys.DFS_NFS_ALLOW_INSECURE_PORTS_DEFAULT); + final Nfs3 nfsServer = new Nfs3(new Configuration(), registrationSocket, + allowInsecurePorts); nfsServer.startServiceInternal(true); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 3007de1754..f78598f13a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -166,11 +166,12 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { private final RpcCallCache rpcCallCache; - public RpcProgramNfs3(Configuration config, DatagramSocket registrationSocket) - throws IOException { + public RpcProgramNfs3(Configuration config, DatagramSocket registrationSocket, + boolean allowInsecurePorts) throws IOException { super("NFS3", "localhost", config.getInt(Nfs3Constant.NFS3_SERVER_PORT, Nfs3Constant.NFS3_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM, - Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket); + Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket, + allowInsecurePorts); config.set(FsPermission.UMASK_LABEL, "000"); iug = new IdUserGroup(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e6f1c4edd3..417b2f8b33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -273,6 +273,9 @@ Release 2.5.0 - UNRELEASED HDFS-6334. Client failover proxy provider for IP failover based NN HA. (kihwal) + HDFS-6406. Add capability for NFS gateway to reject connections from + unprivileged ports. (atm) + IMPROVEMENTS HDFS-6007. Update documentation about short-circuit local reads (iwasakims diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 86268275f8..a6643ec8bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -631,9 +631,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = "dfs.client.hedged.read.threadpool.size"; - public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0; - public static final String DFS_NFS_KEYTAB_FILE_KEY = "dfs.nfs.keytab.file"; - public static final String DFS_NFS_KERBEROS_PRINCIPAL_KEY = "dfs.nfs.kerberos.principal"; - public static final String DFS_NFS_REGISTRATION_PORT_KEY = "dfs.nfs.registration.port"; - public static final int DFS_NFS_REGISTRATION_PORT_DEFAULT = 40; // Currently unassigned. + public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0; + public static final String DFS_NFS_KEYTAB_FILE_KEY = "dfs.nfs.keytab.file"; + public static final String DFS_NFS_KERBEROS_PRINCIPAL_KEY = "dfs.nfs.kerberos.principal"; + public static final String DFS_NFS_REGISTRATION_PORT_KEY = "dfs.nfs.registration.port"; + public static final int DFS_NFS_REGISTRATION_PORT_DEFAULT = 40; // Currently unassigned. + public static final String DFS_NFS_ALLOW_INSECURE_PORTS_KEY = "dfs.nfs.allow.insecure.ports"; + public static final boolean DFS_NFS_ALLOW_INSECURE_PORTS_DEFAULT = true; + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index e1ae7deb89..3aea2def8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1317,6 +1317,17 @@ + + dfs.nfs.allow.insecure.ports + true + + When set to false, client connections originating from unprivileged ports + (those above 1023) will be rejected. This is to ensure that clients + connecting to this NFS Gateway must have had root privilege on the machine + where they're connecting from. + + + dfs.webhdfs.enabled true @@ -1895,4 +1906,4 @@ - \ No newline at end of file +