HDFS-13617. Allow wrapping NN QOP into token in encrypted message. Contributed by Chen Liang

This commit is contained in:
Chen Liang 2019-02-13 12:40:31 -08:00
parent 29b411d5f0
commit 024c87291c
9 changed files with 322 additions and 15 deletions

View File

@ -1711,6 +1711,7 @@ public class Connection {
IpcConnectionContextProto connectionContext;
String protocolName;
SaslServer saslServer;
private String establishedQOP;
private AuthMethod authMethod;
private AuthProtocol authProtocol;
private boolean saslContextEstablished;
@ -1788,14 +1789,7 @@ public InetAddress getHostInetAddress() {
}
public String getEstablishedQOP() {
// In practice, saslServer should not be null when this is
// called. If it is null, it must be either some
// configuration mistake or it is called from unit test.
if (saslServer == null) {
LOG.warn("SASL server should not be null!");
return null;
}
return (String)saslServer.getNegotiatedProperty(Sasl.QOP);
return establishedQOP;
}
public void setLastContact(long lastContact) {
@ -1956,6 +1950,7 @@ private void saslProcess(RpcSaslProto saslMessage)
// do NOT enable wrapping until the last auth response is sent
if (saslContextEstablished) {
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
establishedQOP = qop;
// SASL wrapping is only used if the connection has a QOP, and
// the value is not auth. ex. auth-int & auth-priv
useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));

View File

@ -56,6 +56,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
private Text kind;
private Text service;
private TokenRenewer renewer;
private byte[] dnHandshakeSecret;
/**
* Construct a token given a token identifier and a secret manager for the
@ -68,6 +69,7 @@ public Token(T id, SecretManager<T> mgr) {
identifier = id.getBytes();
kind = id.getKind();
service = new Text();
dnHandshakeSecret = new byte[0];
}
/**
@ -82,6 +84,7 @@ public Token(byte[] identifier, byte[] password, Text kind, Text service) {
this.password = (password == null)? new byte[0] : password;
this.kind = (kind == null)? new Text() : kind;
this.service = (service == null)? new Text() : service;
this.dnHandshakeSecret = new byte[0];
}
/**
@ -92,6 +95,7 @@ public Token() {
password = new byte[0];
kind = new Text();
service = new Text();
dnHandshakeSecret = new byte[0];
}
/**
@ -103,6 +107,7 @@ public Token(Token<T> other) {
this.password = other.password.clone();
this.kind = new Text(other.kind);
this.service = new Text(other.service);
this.dnHandshakeSecret = other.dnHandshakeSecret.clone();
}
public Token<T> copyToken() {
@ -118,6 +123,7 @@ public Token(TokenProto tokenPB) {
this.password = tokenPB.getPassword().toByteArray();
this.kind = new Text(tokenPB.getKindBytes().toByteArray());
this.service = new Text(tokenPB.getServiceBytes().toByteArray());
this.dnHandshakeSecret = new byte[0];
}
/**
@ -143,6 +149,14 @@ public byte[] getIdentifier() {
return identifier;
}
public byte[] getDnHandshakeSecret() {
return dnHandshakeSecret;
}
public void setDNHandshakeSecret(byte[] secret) {
this.dnHandshakeSecret = secret;
}
private static Class<? extends TokenIdentifier>
getClassForIdentifier(Text kind) {
Class<? extends TokenIdentifier> cls = null;
@ -337,6 +351,11 @@ public void readFields(DataInput in) throws IOException {
in.readFully(password);
kind.readFields(in);
service.readFields(in);
len = WritableUtils.readVInt(in);
if (dnHandshakeSecret == null || dnHandshakeSecret.length != len) {
dnHandshakeSecret = new byte[len];
}
in.readFully(dnHandshakeSecret);
}
@Override
@ -347,6 +366,8 @@ public void write(DataOutput out) throws IOException {
out.write(password);
kind.write(out);
service.write(out);
WritableUtils.writeVInt(out, dnHandshakeSecret.length);
out.write(dnHandshakeSecret);
}
/**

View File

@ -36,6 +36,7 @@ message TokenProto {
required bytes password = 2;
required string kind = 3;
required string service = 4;
optional bytes handshakeSecret = 5;
}
message CredentialsKVProto {

View File

@ -349,11 +349,16 @@ public static ExtendedBlockProto convert(final ExtendedBlock b) {
}
public static TokenProto convert(Token<?> tok) {
return TokenProto.newBuilder().
TokenProto.Builder builder = TokenProto.newBuilder().
setIdentifier(getByteString(tok.getIdentifier())).
setPassword(getByteString(tok.getPassword())).
setKindBytes(getFixedByteString(tok.getKind())).
setServiceBytes(getFixedByteString(tok.getService())).build();
setServiceBytes(getFixedByteString(tok.getService()));
if (tok.getDnHandshakeSecret() != null) {
builder.setHandshakeSecret(
ByteString.copyFrom(tok.getDnHandshakeSecret()));
}
return builder.build();
}
public static ShortCircuitShmIdProto convert(ShmId shmId) {
@ -826,9 +831,14 @@ public static StorageType[] convertStorageTypes(
public static Token<BlockTokenIdentifier> convert(
TokenProto blockToken) {
return new Token<>(blockToken.getIdentifier()
Token<BlockTokenIdentifier> token =
new Token<>(blockToken.getIdentifier()
.toByteArray(), blockToken.getPassword().toByteArray(), new Text(
blockToken.getKind()), new Text(blockToken.getService()));
if (blockToken.hasHandshakeSecret()) {
token.setDNHandshakeSecret(blockToken.getHandshakeSecret().toByteArray());
}
return token;
}
// DatanodeId

View File

@ -1285,6 +1285,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY =
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
public static final String DFS_NAMENODE_SEND_QOP_ENABLED =
"dfs.namenode.send.qop.enabled";
public static final boolean DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT = false;
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

View File

@ -534,6 +534,22 @@ public byte[] retrieveDataEncryptionKey(int keyId, byte[] nonce)
return createPassword(nonce, key.getKey());
}
/**
* Encrypt the given message with the current block key, using the current
* block key.
*
* @param message the message to be encrypted.
* @return the secret created by encrypting the given message.
*/
public byte[] secretGen(byte[] message) {
return createPassword(message, currentKey.getKey());
}
@VisibleForTesting
public BlockKey getCurrentKey() {
return currentKey;
}
@VisibleForTesting
public synchronized void setKeyUpdateIntervalForTesting(long millis) {
this.keyUpdateInterval = millis;

View File

@ -27,10 +27,13 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
import static org.apache.hadoop.util.Time.now;
import com.google.common.base.Charsets;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -145,6 +148,8 @@
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -264,6 +269,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
private final String defaultECPolicyName;
private final boolean shouldSendQOP;
public NameNodeRpcServer(Configuration conf, NameNode nn)
throws IOException {
this.nn = nn;
@ -546,6 +553,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
}
}
this.shouldSendQOP = conf.getBoolean(
DFS_NAMENODE_SEND_QOP_ENABLED, DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT);
}
/** Allow access to the lifeline RPC server for testing */
@ -751,8 +760,14 @@ public LocatedBlocks getBlockLocations(String src,
throws IOException {
checkNNStartup();
metrics.incrGetBlockLocations();
return namesystem.getBlockLocations(getClientMachine(),
src, offset, length);
LocatedBlocks locatedBlocks =
namesystem.getBlockLocations(getClientMachine(), src, offset, length);
if (shouldSendQOP) {
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
wrapEstablishedQOP(lb, getEstablishedClientQOP());
}
}
return locatedBlocks;
}
@Override // ClientProtocol
@ -825,6 +840,9 @@ public LastBlockWithStatus append(String src, String clientName,
RetryCache.setState(cacheEntry, success, info);
}
metrics.incrFilesAppended();
if (shouldSendQOP) {
wrapEstablishedQOP(info.getLastBlock(), getEstablishedClientQOP());
}
return info;
}
@ -893,6 +911,9 @@ public LocatedBlock addBlock(String src, String clientName,
if (locatedBlock != null) {
metrics.incrAddBlockOps();
}
if (shouldSendQOP) {
wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
}
return locatedBlock;
}
@ -923,8 +944,13 @@ public LocatedBlock getAdditionalDatanode(final String src,
excludeSet.add(node);
}
}
return namesystem.getAdditionalDatanode(src, fileId, blk, existings,
existingStorageIDs, excludeSet, numAdditionalNodes, clientName);
LocatedBlock locatedBlock = namesystem.getAdditionalDatanode(src, fileId,
blk, existings, existingStorageIDs, excludeSet, numAdditionalNodes,
clientName);
if (shouldSendQOP) {
wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
}
return locatedBlock;
}
/**
* The client needs to give up on the block.
@ -1839,6 +1865,17 @@ private static String getClientMachine() {
return clientMachine;
}
/**
* Return the QOP of the client that the current handler thread
* is handling. Assuming the negotiation is done at this point,
* otherwise returns null.
*
* @return the established QOP of this client.
*/
private static String getEstablishedClientQOP() {
return Server.getEstablishedQOP();
}
@Override
public DataEncryptionKey getDataEncryptionKey() throws IOException {
checkNNStartup();
@ -2589,4 +2626,26 @@ public Long getNextSPSPath() throws IOException {
}
return namesystem.getBlockManager().getSPSManager().getNextPathId();
}
/**
* Wrapping the QOP information into the LocatedBlock instance.
* The wrapped QOP will be used by DataNode, i.e. DataNode will simply use
* this QOP to accept client calls, because this this QOP is viewed
* as the QOP that NameNode has accepted.
*
* @param locatedBlock the LocatedBlock instance
* @param qop the QOP to wrap in
* @throws RuntimeException
*/
private void wrapEstablishedQOP(LocatedBlock locatedBlock, String qop) {
if (qop == null || locatedBlock == null) {
return;
}
BlockTokenSecretManager btsm = namesystem.getBlockManager()
.getBlockTokenSecretManager();
Token<BlockTokenIdentifier> token = locatedBlock.getBlockToken();
byte[] secret = btsm.secretGen(qop.getBytes(Charsets.UTF_8));
token.setDNHandshakeSecret(secret);
}
}

View File

@ -5263,4 +5263,15 @@
Empty list indicates that auxiliary ports are disabled.
</description>
</property>
<property>
<name>dfs.namenode.send.qop.enabled</name>
<value>false</value>
<description>
A boolean specifies whether NameNode should encrypt the established QOP
and include it in block token. The encrypted QOP will be used by DataNode
as target QOP, overwriting DataNode configuration. This ensures DataNode
will use exactly the same QOP NameNode and client has already agreed on.
</description>
</property>
</configuration>

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import javax.crypto.Mac;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.security.TestPermission;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.junit.Assert.*;
/**
* This tests enabling NN sending the established QOP back to client,
* in encrypted message, using block access token key.
*/
@RunWith(Parameterized.class)
public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
public static final Log LOG = LogFactory.getLog(TestPermission.class);
private HdfsConfiguration conf;
private MiniDFSCluster cluster;
private String encryptionAlgorithm;
private DistributedFileSystem dfs;
private String configKey;
private String qopValue;
@Parameterized.Parameters
public static Collection<Object[]> qopSettings() {
// if configured with privacy, the negotiated QOP should auth-conf
// similarly for the other two
return Arrays.asList(new Object[][] {
{"privacy", "auth-conf"},
{"integrity", "auth-int"},
{"authentication", "auth"}
});
}
public TestBlockTokenWrappingQOP(String configKey, String qopValue) {
this.configKey = configKey;
this.qopValue = qopValue;
}
@Before
public void setup() throws Exception {
conf = createSecureConfig(this.configKey);
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
conf.set(HADOOP_RPC_PROTECTION, this.configKey);
cluster = null;
encryptionAlgorithm = "HmacSHA1";
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testAddBlockWrappingQOP() throws Exception {
final String src = "/testAddBlockWrappingQOP";
final Path path = new Path(src);
dfs = cluster.getFileSystem();
dfs.create(path);
DFSClient client = dfs.getClient();
String clientName = client.getClientName();
LocatedBlock lb = client.namenode.addBlock(src, clientName, null, null,
HdfsConstants.GRANDFATHER_INODE_ID, null, null);
byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
BlockKey currentKey = cluster.getNamesystem().getBlockManager()
.getBlockTokenSecretManager().getCurrentKey();
String decrypted = decryptMessage(secret, currentKey,
encryptionAlgorithm);
assertEquals(this.qopValue, decrypted);
}
@Test
public void testAppendWrappingQOP() throws Exception {
final String src = "/testAppendWrappingQOP";
final Path path = new Path(src);
dfs = cluster.getFileSystem();
FSDataOutputStream out = dfs.create(path);
// NameNode append call returns a last block instance. If there is nothing
// it returns as a null. So write something, so that lastBlock has
// something
out.write(0);
out.close();
DFSClient client = dfs.getClient();
String clientName = client.getClientName();
LastBlockWithStatus lastBlock = client.namenode.append(src, clientName,
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
byte[] secret = lastBlock.getLastBlock().getBlockToken()
.getDnHandshakeSecret();
BlockKey currentKey = cluster.getNamesystem().getBlockManager()
.getBlockTokenSecretManager().getCurrentKey();
String decrypted = decryptMessage(secret, currentKey,
encryptionAlgorithm);
assertEquals(this.qopValue, decrypted);
}
@Test
public void testGetBlockLocationWrappingQOP() throws Exception {
final String src = "/testGetBlockLocationWrappingQOP";
final Path path = new Path(src);
dfs = cluster.getFileSystem();
FSDataOutputStream out = dfs.create(path);
// if the file is empty, there will be no blocks returned. Write something
// so that getBlockLocations actually returns some block.
out.write(0);
out.close();
FileStatus status = dfs.getFileStatus(path);
DFSClient client = dfs.getClient();
LocatedBlocks lbs = client.namenode.getBlockLocations(
src, 0, status.getLen());
assertTrue(lbs.getLocatedBlocks().size() > 0);
BlockKey currentKey = cluster.getNamesystem().getBlockManager()
.getBlockTokenSecretManager().getCurrentKey();
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
String decrypted = decryptMessage(secret, currentKey,
encryptionAlgorithm);
assertEquals(this.qopValue, decrypted);
}
}
private String decryptMessage(byte[] secret, BlockKey key,
String algorithm) throws Exception {
String[] qops = {"auth", "auth-conf", "auth-int"};
Mac mac = Mac.getInstance(algorithm);
mac.init(key.getKey());
for (String qop : qops) {
byte[] encrypted = mac.doFinal(qop.getBytes());
if (Arrays.equals(encrypted, secret)) {
return qop;
}
}
return null;
}
}