diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index dca9551e9e..23c6351dbf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -1711,6 +1711,7 @@ public class Connection { IpcConnectionContextProto connectionContext; String protocolName; SaslServer saslServer; + private String establishedQOP; private AuthMethod authMethod; private AuthProtocol authProtocol; private boolean saslContextEstablished; @@ -1788,14 +1789,7 @@ public InetAddress getHostInetAddress() { } public String getEstablishedQOP() { - // In practice, saslServer should not be null when this is - // called. If it is null, it must be either some - // configuration mistake or it is called from unit test. - if (saslServer == null) { - LOG.warn("SASL server should not be null!"); - return null; - } - return (String)saslServer.getNegotiatedProperty(Sasl.QOP); + return establishedQOP; } public void setLastContact(long lastContact) { @@ -1956,6 +1950,7 @@ private void saslProcess(RpcSaslProto saslMessage) // do NOT enable wrapping until the last auth response is sent if (saslContextEstablished) { String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); + establishedQOP = qop; // SASL wrapping is only used if the connection has a QOP, and // the value is not auth. ex. auth-int & auth-priv useWrap = (qop != null && !"auth".equalsIgnoreCase(qop)); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java index edc1e1d5aa..94481a2773 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java @@ -56,6 +56,7 @@ public class Token implements Writable { private Text kind; private Text service; private TokenRenewer renewer; + private byte[] dnHandshakeSecret; /** * Construct a token given a token identifier and a secret manager for the @@ -68,6 +69,7 @@ public Token(T id, SecretManager mgr) { identifier = id.getBytes(); kind = id.getKind(); service = new Text(); + dnHandshakeSecret = new byte[0]; } /** @@ -82,6 +84,7 @@ public Token(byte[] identifier, byte[] password, Text kind, Text service) { this.password = (password == null)? new byte[0] : password; this.kind = (kind == null)? new Text() : kind; this.service = (service == null)? new Text() : service; + this.dnHandshakeSecret = new byte[0]; } /** @@ -92,6 +95,7 @@ public Token() { password = new byte[0]; kind = new Text(); service = new Text(); + dnHandshakeSecret = new byte[0]; } /** @@ -103,6 +107,7 @@ public Token(Token other) { this.password = other.password.clone(); this.kind = new Text(other.kind); this.service = new Text(other.service); + this.dnHandshakeSecret = other.dnHandshakeSecret.clone(); } public Token copyToken() { @@ -118,6 +123,7 @@ public Token(TokenProto tokenPB) { this.password = tokenPB.getPassword().toByteArray(); this.kind = new Text(tokenPB.getKindBytes().toByteArray()); this.service = new Text(tokenPB.getServiceBytes().toByteArray()); + this.dnHandshakeSecret = new byte[0]; } /** @@ -143,6 +149,14 @@ public byte[] getIdentifier() { return identifier; } + public byte[] getDnHandshakeSecret() { + return dnHandshakeSecret; + } + + public void setDNHandshakeSecret(byte[] secret) { + this.dnHandshakeSecret = secret; + } + private static Class getClassForIdentifier(Text kind) { Class cls = null; @@ -337,6 +351,11 @@ public void readFields(DataInput in) throws IOException { in.readFully(password); kind.readFields(in); service.readFields(in); + len = WritableUtils.readVInt(in); + if (dnHandshakeSecret == null || dnHandshakeSecret.length != len) { + dnHandshakeSecret = new byte[len]; + } + in.readFully(dnHandshakeSecret); } @Override @@ -347,6 +366,8 @@ public void write(DataOutput out) throws IOException { out.write(password); kind.write(out); service.write(out); + WritableUtils.writeVInt(out, dnHandshakeSecret.length); + out.write(dnHandshakeSecret); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/proto/Security.proto b/hadoop-common-project/hadoop-common/src/main/proto/Security.proto index 037a8781a9..4cf452023e 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/Security.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/Security.proto @@ -36,6 +36,7 @@ message TokenProto { required bytes password = 2; required string kind = 3; required string service = 4; + optional bytes handshakeSecret = 5; } message CredentialsKVProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 4b966bb529..829d3ef640 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -349,11 +349,16 @@ public static ExtendedBlockProto convert(final ExtendedBlock b) { } public static TokenProto convert(Token tok) { - return TokenProto.newBuilder(). + TokenProto.Builder builder = TokenProto.newBuilder(). setIdentifier(getByteString(tok.getIdentifier())). setPassword(getByteString(tok.getPassword())). setKindBytes(getFixedByteString(tok.getKind())). - setServiceBytes(getFixedByteString(tok.getService())).build(); + setServiceBytes(getFixedByteString(tok.getService())); + if (tok.getDnHandshakeSecret() != null) { + builder.setHandshakeSecret( + ByteString.copyFrom(tok.getDnHandshakeSecret())); + } + return builder.build(); } public static ShortCircuitShmIdProto convert(ShmId shmId) { @@ -826,9 +831,14 @@ public static StorageType[] convertStorageTypes( public static Token convert( TokenProto blockToken) { - return new Token<>(blockToken.getIdentifier() + Token token = + new Token<>(blockToken.getIdentifier() .toByteArray(), blockToken.getPassword().toByteArray(), new Text( blockToken.getKind()), new Text(blockToken.getService())); + if (blockToken.hasHandshakeSecret()) { + token.setDNHandshakeSecret(blockToken.getHandshakeSecret().toByteArray()); + } + return token; } // DatanodeId diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 3bdff94b86..fedfc5a7dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1285,6 +1285,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY = HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; + public static final String DFS_NAMENODE_SEND_QOP_ENABLED = + "dfs.namenode.send.qop.enabled"; + public static final boolean DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT = false; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java index 52bc52da21..001544cde7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java @@ -534,6 +534,22 @@ public byte[] retrieveDataEncryptionKey(int keyId, byte[] nonce) return createPassword(nonce, key.getKey()); } + /** + * Encrypt the given message with the current block key, using the current + * block key. + * + * @param message the message to be encrypted. + * @return the secret created by encrypting the given message. + */ + public byte[] secretGen(byte[] message) { + return createPassword(message, currentKey.getKey()); + } + + @VisibleForTesting + public BlockKey getCurrentKey() { + return currentKey; + } + @VisibleForTesting public synchronized void setKeyUpdateIntervalForTesting(long millis) { this.keyUpdateInterval = millis; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 98c07bd623..0da8099f10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -27,10 +27,13 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH; import static org.apache.hadoop.util.Time.now; +import com.google.common.base.Charsets; import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetSocketAddress; @@ -145,6 +148,8 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -264,6 +269,8 @@ public class NameNodeRpcServer implements NamenodeProtocols { private final String defaultECPolicyName; + private final boolean shouldSendQOP; + public NameNodeRpcServer(Configuration conf, NameNode nn) throws IOException { this.nn = nn; @@ -546,6 +553,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) this.clientRpcServer.addAuxiliaryListener(auxiliaryPort); } } + this.shouldSendQOP = conf.getBoolean( + DFS_NAMENODE_SEND_QOP_ENABLED, DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT); } /** Allow access to the lifeline RPC server for testing */ @@ -751,8 +760,14 @@ public LocatedBlocks getBlockLocations(String src, throws IOException { checkNNStartup(); metrics.incrGetBlockLocations(); - return namesystem.getBlockLocations(getClientMachine(), - src, offset, length); + LocatedBlocks locatedBlocks = + namesystem.getBlockLocations(getClientMachine(), src, offset, length); + if (shouldSendQOP) { + for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { + wrapEstablishedQOP(lb, getEstablishedClientQOP()); + } + } + return locatedBlocks; } @Override // ClientProtocol @@ -825,6 +840,9 @@ public LastBlockWithStatus append(String src, String clientName, RetryCache.setState(cacheEntry, success, info); } metrics.incrFilesAppended(); + if (shouldSendQOP) { + wrapEstablishedQOP(info.getLastBlock(), getEstablishedClientQOP()); + } return info; } @@ -893,6 +911,9 @@ public LocatedBlock addBlock(String src, String clientName, if (locatedBlock != null) { metrics.incrAddBlockOps(); } + if (shouldSendQOP) { + wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP()); + } return locatedBlock; } @@ -923,8 +944,13 @@ public LocatedBlock getAdditionalDatanode(final String src, excludeSet.add(node); } } - return namesystem.getAdditionalDatanode(src, fileId, blk, existings, - existingStorageIDs, excludeSet, numAdditionalNodes, clientName); + LocatedBlock locatedBlock = namesystem.getAdditionalDatanode(src, fileId, + blk, existings, existingStorageIDs, excludeSet, numAdditionalNodes, + clientName); + if (shouldSendQOP) { + wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP()); + } + return locatedBlock; } /** * The client needs to give up on the block. @@ -1839,6 +1865,17 @@ private static String getClientMachine() { return clientMachine; } + /** + * Return the QOP of the client that the current handler thread + * is handling. Assuming the negotiation is done at this point, + * otherwise returns null. + * + * @return the established QOP of this client. + */ + private static String getEstablishedClientQOP() { + return Server.getEstablishedQOP(); + } + @Override public DataEncryptionKey getDataEncryptionKey() throws IOException { checkNNStartup(); @@ -2589,4 +2626,26 @@ public Long getNextSPSPath() throws IOException { } return namesystem.getBlockManager().getSPSManager().getNextPathId(); } + + + /** + * Wrapping the QOP information into the LocatedBlock instance. + * The wrapped QOP will be used by DataNode, i.e. DataNode will simply use + * this QOP to accept client calls, because this this QOP is viewed + * as the QOP that NameNode has accepted. + * + * @param locatedBlock the LocatedBlock instance + * @param qop the QOP to wrap in + * @throws RuntimeException + */ + private void wrapEstablishedQOP(LocatedBlock locatedBlock, String qop) { + if (qop == null || locatedBlock == null) { + return; + } + BlockTokenSecretManager btsm = namesystem.getBlockManager() + .getBlockTokenSecretManager(); + Token token = locatedBlock.getBlockToken(); + byte[] secret = btsm.secretGen(qop.getBytes(Charsets.UTF_8)); + token.setDNHandshakeSecret(secret); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 7704cd5af0..57dc2796c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5263,4 +5263,15 @@ Empty list indicates that auxiliary ports are disabled. + + + dfs.namenode.send.qop.enabled + false + + A boolean specifies whether NameNode should encrypt the established QOP + and include it in block token. The encrypted QOP will be used by DataNode + as target QOP, overwriting DataNode configuration. This ensures DataNode + will use exactly the same QOP NameNode and client has already agreed on. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java new file mode 100644 index 0000000000..ea7ab97eb0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import javax.crypto.Mac; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase; +import org.apache.hadoop.hdfs.security.token.block.BlockKey; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.security.TestPermission; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.junit.Assert.*; + + +/** + * This tests enabling NN sending the established QOP back to client, + * in encrypted message, using block access token key. + */ +@RunWith(Parameterized.class) +public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase { + public static final Log LOG = LogFactory.getLog(TestPermission.class); + + private HdfsConfiguration conf; + private MiniDFSCluster cluster; + private String encryptionAlgorithm; + private DistributedFileSystem dfs; + + private String configKey; + private String qopValue; + + @Parameterized.Parameters + public static Collection qopSettings() { + // if configured with privacy, the negotiated QOP should auth-conf + // similarly for the other two + return Arrays.asList(new Object[][] { + {"privacy", "auth-conf"}, + {"integrity", "auth-int"}, + {"authentication", "auth"} + }); + } + + public TestBlockTokenWrappingQOP(String configKey, String qopValue) { + this.configKey = configKey; + this.qopValue = qopValue; + } + + @Before + public void setup() throws Exception { + conf = createSecureConfig(this.configKey); + conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true); + conf.set(HADOOP_RPC_PROTECTION, this.configKey); + cluster = null; + encryptionAlgorithm = "HmacSHA1"; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testAddBlockWrappingQOP() throws Exception { + final String src = "/testAddBlockWrappingQOP"; + final Path path = new Path(src); + + dfs = cluster.getFileSystem(); + dfs.create(path); + + DFSClient client = dfs.getClient(); + String clientName = client.getClientName(); + + LocatedBlock lb = client.namenode.addBlock(src, clientName, null, null, + HdfsConstants.GRANDFATHER_INODE_ID, null, null); + byte[] secret = lb.getBlockToken().getDnHandshakeSecret(); + BlockKey currentKey = cluster.getNamesystem().getBlockManager() + .getBlockTokenSecretManager().getCurrentKey(); + String decrypted = decryptMessage(secret, currentKey, + encryptionAlgorithm); + assertEquals(this.qopValue, decrypted); + } + + @Test + public void testAppendWrappingQOP() throws Exception { + final String src = "/testAppendWrappingQOP"; + final Path path = new Path(src); + + dfs = cluster.getFileSystem(); + FSDataOutputStream out = dfs.create(path); + // NameNode append call returns a last block instance. If there is nothing + // it returns as a null. So write something, so that lastBlock has + // something + out.write(0); + out.close(); + + DFSClient client = dfs.getClient(); + String clientName = client.getClientName(); + + LastBlockWithStatus lastBlock = client.namenode.append(src, clientName, + new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))); + + byte[] secret = lastBlock.getLastBlock().getBlockToken() + .getDnHandshakeSecret(); + BlockKey currentKey = cluster.getNamesystem().getBlockManager() + .getBlockTokenSecretManager().getCurrentKey(); + String decrypted = decryptMessage(secret, currentKey, + encryptionAlgorithm); + assertEquals(this.qopValue, decrypted); + } + + @Test + public void testGetBlockLocationWrappingQOP() throws Exception { + final String src = "/testGetBlockLocationWrappingQOP"; + final Path path = new Path(src); + + dfs = cluster.getFileSystem(); + FSDataOutputStream out = dfs.create(path); + // if the file is empty, there will be no blocks returned. Write something + // so that getBlockLocations actually returns some block. + out.write(0); + out.close(); + + FileStatus status = dfs.getFileStatus(path); + DFSClient client = dfs.getClient(); + LocatedBlocks lbs = client.namenode.getBlockLocations( + src, 0, status.getLen()); + + assertTrue(lbs.getLocatedBlocks().size() > 0); + + BlockKey currentKey = cluster.getNamesystem().getBlockManager() + .getBlockTokenSecretManager().getCurrentKey(); + for (LocatedBlock lb : lbs.getLocatedBlocks()) { + byte[] secret = lb.getBlockToken().getDnHandshakeSecret(); + String decrypted = decryptMessage(secret, currentKey, + encryptionAlgorithm); + assertEquals(this.qopValue, decrypted); + } + } + + private String decryptMessage(byte[] secret, BlockKey key, + String algorithm) throws Exception { + String[] qops = {"auth", "auth-conf", "auth-int"}; + Mac mac = Mac.getInstance(algorithm); + mac.init(key.getKey()); + for (String qop : qops) { + byte[] encrypted = mac.doFinal(qop.getBytes()); + if (Arrays.equals(encrypted, secret)) { + return qop; + } + } + return null; + } +}