HDFS-14611. Move handshake secret field from Token to BlockAccessToken. Contributed by Chen Liang.
This commit is contained in:
parent
6a3433bffd
commit
8fb5ca3f40
@ -56,7 +56,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
|
||||
private Text kind;
|
||||
private Text service;
|
||||
private TokenRenewer renewer;
|
||||
private byte[] dnHandshakeSecret;
|
||||
|
||||
/**
|
||||
* Construct a token given a token identifier and a secret manager for the
|
||||
@ -69,7 +68,14 @@ public Token(T id, SecretManager<T> mgr) {
|
||||
identifier = id.getBytes();
|
||||
kind = id.getKind();
|
||||
service = new Text();
|
||||
dnHandshakeSecret = new byte[0];
|
||||
}
|
||||
|
||||
public void setID(byte[] bytes) {
|
||||
identifier = bytes;
|
||||
}
|
||||
|
||||
public void setPassword(byte[] newPassword) {
|
||||
password = newPassword;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -84,7 +90,6 @@ public Token(byte[] identifier, byte[] password, Text kind, Text service) {
|
||||
this.password = (password == null)? new byte[0] : password;
|
||||
this.kind = (kind == null)? new Text() : kind;
|
||||
this.service = (service == null)? new Text() : service;
|
||||
this.dnHandshakeSecret = new byte[0];
|
||||
}
|
||||
|
||||
/**
|
||||
@ -95,7 +100,6 @@ public Token() {
|
||||
password = new byte[0];
|
||||
kind = new Text();
|
||||
service = new Text();
|
||||
dnHandshakeSecret = new byte[0];
|
||||
}
|
||||
|
||||
/**
|
||||
@ -107,7 +111,6 @@ public Token(Token<T> other) {
|
||||
this.password = other.password.clone();
|
||||
this.kind = new Text(other.kind);
|
||||
this.service = new Text(other.service);
|
||||
this.dnHandshakeSecret = other.dnHandshakeSecret.clone();
|
||||
}
|
||||
|
||||
public Token<T> copyToken() {
|
||||
@ -123,7 +126,6 @@ public Token(TokenProto tokenPB) {
|
||||
this.password = tokenPB.getPassword().toByteArray();
|
||||
this.kind = new Text(tokenPB.getKindBytes().toByteArray());
|
||||
this.service = new Text(tokenPB.getServiceBytes().toByteArray());
|
||||
this.dnHandshakeSecret = new byte[0];
|
||||
}
|
||||
|
||||
/**
|
||||
@ -149,14 +151,6 @@ public byte[] getIdentifier() {
|
||||
return identifier;
|
||||
}
|
||||
|
||||
public byte[] getDnHandshakeSecret() {
|
||||
return dnHandshakeSecret;
|
||||
}
|
||||
|
||||
public void setDNHandshakeSecret(byte[] secret) {
|
||||
this.dnHandshakeSecret = secret;
|
||||
}
|
||||
|
||||
private static Class<? extends TokenIdentifier>
|
||||
getClassForIdentifier(Text kind) {
|
||||
Class<? extends TokenIdentifier> cls = null;
|
||||
@ -351,11 +345,6 @@ public void readFields(DataInput in) throws IOException {
|
||||
in.readFully(password);
|
||||
kind.readFields(in);
|
||||
service.readFields(in);
|
||||
len = WritableUtils.readVInt(in);
|
||||
if (dnHandshakeSecret == null || dnHandshakeSecret.length != len) {
|
||||
dnHandshakeSecret = new byte[len];
|
||||
}
|
||||
in.readFully(dnHandshakeSecret);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -366,8 +355,6 @@ public void write(DataOutput out) throws IOException {
|
||||
out.write(password);
|
||||
kind.write(out);
|
||||
service.write(out);
|
||||
WritableUtils.writeVInt(out, dnHandshakeSecret.length);
|
||||
out.write(dnHandshakeSecret);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -36,7 +36,6 @@ message TokenProto {
|
||||
required bytes password = 2;
|
||||
required string kind = 3;
|
||||
required string service = 4;
|
||||
optional bytes handshakeSecret = 5;
|
||||
}
|
||||
|
||||
message CredentialsKVProto {
|
||||
|
@ -320,9 +320,7 @@ private IOStreamPair getEncryptedStreams(InetAddress addr,
|
||||
if (secretKey != null) {
|
||||
LOG.debug("DataNode overwriting downstream QOP" +
|
||||
saslProps.get(Sasl.QOP));
|
||||
byte[] newSecret = SecretManager.createPassword(saslProps.get(Sasl.QOP)
|
||||
.getBytes(Charsets.UTF_8), secretKey);
|
||||
accessToken.setDNHandshakeSecret(newSecret);
|
||||
updateToken(accessToken, secretKey, saslProps);
|
||||
}
|
||||
|
||||
LOG.debug("Client using encryption algorithm {}",
|
||||
@ -438,9 +436,7 @@ private IOStreamPair getSaslStreams(InetAddress addr,
|
||||
}
|
||||
LOG.debug("DataNode overwriting downstream QOP " +
|
||||
saslProps.get(Sasl.QOP));
|
||||
byte[] newSecret = SecretManager.createPassword(
|
||||
saslProps.get(Sasl.QOP).getBytes(Charsets.UTF_8), secretKey);
|
||||
accessToken.setDNHandshakeSecret(newSecret);
|
||||
updateToken(accessToken, secretKey, saslProps);
|
||||
}
|
||||
targetQOP = saslProps.get(Sasl.QOP);
|
||||
String userName = buildUserName(accessToken);
|
||||
@ -451,6 +447,18 @@ private IOStreamPair getSaslStreams(InetAddress addr,
|
||||
saslProps, callbackHandler, accessToken);
|
||||
}
|
||||
|
||||
private void updateToken(Token<BlockTokenIdentifier> accessToken,
|
||||
SecretKey secretKey, Map<String, String> saslProps)
|
||||
throws IOException {
|
||||
byte[] newSecret = saslProps.get(Sasl.QOP).getBytes(Charsets.UTF_8);
|
||||
BlockTokenIdentifier bkid = accessToken.decodeIdentifier();
|
||||
bkid.setHandshakeMsg(newSecret);
|
||||
byte[] bkidBytes = bkid.getBytes();
|
||||
accessToken.setPassword(
|
||||
SecretManager.createPassword(bkidBytes, secretKey));
|
||||
accessToken.setID(bkidBytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the client's user name for the general-purpose handshake, consisting
|
||||
* of the base64-encoded serialized block access token identifier. Note that
|
||||
@ -507,20 +515,29 @@ private IOStreamPair doSaslHandshake(InetAddress addr,
|
||||
try {
|
||||
// Start of handshake - "initial response" in SASL terminology.
|
||||
// The handshake secret can be null, this happens when client is running
|
||||
// a new version but the cluster does not have this feature. In which case
|
||||
// there will be no encrypted secret sent from NN.
|
||||
byte[] handshakeSecret = accessToken.getDnHandshakeSecret();
|
||||
if (handshakeSecret == null || handshakeSecret.length == 0) {
|
||||
LOG.debug("Handshake secret is null, sending without "
|
||||
+ "handshake secret.");
|
||||
sendSaslMessage(out, new byte[0]);
|
||||
// a new version but the cluster does not have this feature.
|
||||
// In which case there will be no encrypted secret sent from NN.
|
||||
BlockTokenIdentifier blockTokenIdentifier =
|
||||
accessToken.decodeIdentifier();
|
||||
if (blockTokenIdentifier != null) {
|
||||
byte[] handshakeSecret =
|
||||
accessToken.decodeIdentifier().getHandshakeMsg();
|
||||
if (handshakeSecret == null || handshakeSecret.length == 0) {
|
||||
LOG.debug("Handshake secret is null, "
|
||||
+ "sending without handshake secret.");
|
||||
sendSaslMessage(out, new byte[0]);
|
||||
} else {
|
||||
LOG.debug("Sending handshake secret.");
|
||||
BlockTokenIdentifier identifier = new BlockTokenIdentifier();
|
||||
identifier.readFields(new DataInputStream(
|
||||
new ByteArrayInputStream(accessToken.getIdentifier())));
|
||||
String bpid = identifier.getBlockPoolId();
|
||||
sendSaslMessageHandshakeSecret(out, new byte[0],
|
||||
handshakeSecret, bpid);
|
||||
}
|
||||
} else {
|
||||
LOG.debug("Sending handshake secret.");
|
||||
BlockTokenIdentifier identifier = new BlockTokenIdentifier();
|
||||
identifier.readFields(new DataInputStream(
|
||||
new ByteArrayInputStream(accessToken.getIdentifier())));
|
||||
String bpid = identifier.getBlockPoolId();
|
||||
sendSaslMessageHandshakeSecret(out, new byte[0], handshakeSecret, bpid);
|
||||
LOG.debug("Block token id is null, sending without handshake secret.");
|
||||
sendSaslMessage(out, new byte[0]);
|
||||
}
|
||||
|
||||
// step 1
|
||||
|
@ -354,10 +354,6 @@ public static TokenProto convert(Token<?> tok) {
|
||||
setPassword(getByteString(tok.getPassword())).
|
||||
setKindBytes(getFixedByteString(tok.getKind())).
|
||||
setServiceBytes(getFixedByteString(tok.getService()));
|
||||
if (tok.getDnHandshakeSecret() != null) {
|
||||
builder.setHandshakeSecret(
|
||||
ByteString.copyFrom(tok.getDnHandshakeSecret()));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@ -779,6 +775,11 @@ public static BlockTokenSecretProto convert(
|
||||
for (String storageId : blockTokenSecret.getStorageIds()) {
|
||||
builder.addStorageIds(storageId);
|
||||
}
|
||||
|
||||
byte[] handshake = blockTokenSecret.getHandshakeMsg();
|
||||
if (handshake != null && handshake.length > 0) {
|
||||
builder.setHandshakeSecret(getByteString(handshake));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@ -835,9 +836,6 @@ public static Token<BlockTokenIdentifier> convert(
|
||||
new Token<>(blockToken.getIdentifier()
|
||||
.toByteArray(), blockToken.getPassword().toByteArray(), new Text(
|
||||
blockToken.getKind()), new Text(blockToken.getService()));
|
||||
if (blockToken.hasHandshakeSecret()) {
|
||||
token.setDNHandshakeSecret(blockToken.getHandshakeSecret().toByteArray());
|
||||
}
|
||||
return token;
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.io.DataInput;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutput;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
@ -55,6 +56,7 @@ public enum AccessMode {
|
||||
private StorageType[] storageTypes;
|
||||
private String[] storageIds;
|
||||
private boolean useProto;
|
||||
private byte[] handshakeMsg;
|
||||
|
||||
private byte [] cache;
|
||||
|
||||
@ -76,6 +78,7 @@ public BlockTokenIdentifier(String userId, String bpid, long blockId,
|
||||
this.storageIds = Optional.ofNullable(storageIds)
|
||||
.orElse(new String[0]);
|
||||
this.useProto = useProto;
|
||||
this.handshakeMsg = new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -134,6 +137,14 @@ public String[] getStorageIds(){
|
||||
return storageIds;
|
||||
}
|
||||
|
||||
public byte[] getHandshakeMsg() {
|
||||
return handshakeMsg;
|
||||
}
|
||||
|
||||
public void setHandshakeMsg(byte[] bytes) {
|
||||
handshakeMsg = bytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "block_token_identifier (expiryDate=" + this.getExpiryDate()
|
||||
@ -241,6 +252,16 @@ void readFieldsLegacy(DataInput in) throws IOException {
|
||||
storageIds = readStorageIds;
|
||||
|
||||
useProto = false;
|
||||
|
||||
try {
|
||||
int handshakeMsgLen = WritableUtils.readVInt(in);
|
||||
if (handshakeMsgLen != 0) {
|
||||
handshakeMsg = new byte[handshakeMsgLen];
|
||||
in.readFully(handshakeMsg);
|
||||
}
|
||||
} catch (EOFException eof) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@ -271,6 +292,13 @@ void readFieldsProtobuf(DataInput in) throws IOException {
|
||||
storageIds = blockTokenSecretProto.getStorageIdsList().stream()
|
||||
.toArray(String[]::new);
|
||||
useProto = true;
|
||||
|
||||
if(blockTokenSecretProto.hasHandshakeSecret()) {
|
||||
handshakeMsg = blockTokenSecretProto
|
||||
.getHandshakeSecret().toByteArray();
|
||||
} else {
|
||||
handshakeMsg = new byte[0];
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -301,6 +329,10 @@ void writeLegacy(DataOutput out) throws IOException {
|
||||
for (String id: storageIds) {
|
||||
WritableUtils.writeString(out, id);
|
||||
}
|
||||
if (handshakeMsg != null && handshakeMsg.length > 0) {
|
||||
WritableUtils.writeVInt(out, handshakeMsg.length);
|
||||
out.write(handshakeMsg);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -671,4 +671,5 @@ message BlockTokenSecretProto {
|
||||
repeated AccessModeProto modes = 6;
|
||||
repeated StorageTypeProto storageTypes = 7;
|
||||
repeated string storageIds = 8;
|
||||
optional bytes handshakeSecret = 9;
|
||||
}
|
||||
|
@ -28,11 +28,9 @@
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
import javax.security.auth.callback.Callback;
|
||||
import javax.security.auth.callback.CallbackHandler;
|
||||
import javax.security.auth.callback.NameCallback;
|
||||
@ -52,15 +50,12 @@
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DNConf;
|
||||
import org.apache.hadoop.security.SaslPropertiesResolver;
|
||||
import org.apache.hadoop.security.SaslRpcServer;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -348,21 +343,6 @@ private BlockTokenIdentifier deserializeIdentifier(String str)
|
||||
return identifier;
|
||||
}
|
||||
|
||||
private String examineSecret(byte[] secret, String bpid) {
|
||||
BlockKey blockKey = blockPoolTokenSecretManager.get(bpid).getCurrentKey();
|
||||
SecretKey secretKey = blockKey.getKey();
|
||||
for (SaslRpcServer.QualityOfProtection qop :
|
||||
SaslRpcServer.QualityOfProtection.values()) {
|
||||
String qopString = qop.getSaslQop();
|
||||
byte[] data = qopString.getBytes(Charsets.UTF_8);
|
||||
byte[] encryptedData = SecretManager.createPassword(data, secretKey);
|
||||
if (Arrays.equals(encryptedData, secret)) {
|
||||
return qopString;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getNegotiatedQOP() {
|
||||
return negotiatedQOP;
|
||||
@ -399,12 +379,8 @@ private IOStreamPair doSaslHandshake(Peer peer, OutputStream underlyingOut,
|
||||
if (secret != null || bpid != null) {
|
||||
// sanity check, if one is null, the other must also not be null
|
||||
assert(secret != null && bpid != null);
|
||||
String qop = examineSecret(secret, bpid);
|
||||
if (qop != null) {
|
||||
saslProps.put(Sasl.QOP, qop);
|
||||
} else {
|
||||
LOG.error("Unable to match secret to a QOP!");
|
||||
}
|
||||
String qop = new String(secret, Charsets.UTF_8);
|
||||
saslProps.put(Sasl.QOP, qop);
|
||||
}
|
||||
SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(
|
||||
saslProps, callbackHandler);
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.hdfs.security.token.block;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
@ -29,6 +30,7 @@
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
@ -94,6 +96,8 @@ public class BlockTokenSecretManager extends
|
||||
|
||||
private final boolean useProto;
|
||||
|
||||
private final boolean shouldWrapQOP;
|
||||
|
||||
private final SecureRandom nonceGenerator = new SecureRandom();
|
||||
|
||||
/**
|
||||
@ -112,7 +116,25 @@ public BlockTokenSecretManager(long keyUpdateInterval,
|
||||
long tokenLifetime, String blockPoolId, String encryptionAlgorithm,
|
||||
boolean useProto) {
|
||||
this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
|
||||
encryptionAlgorithm, 0, 1, useProto);
|
||||
encryptionAlgorithm, 0, 1, useProto, false);
|
||||
}
|
||||
|
||||
public BlockTokenSecretManager(long keyUpdateInterval,
|
||||
long tokenLifetime, int nnIndex, int numNNs, String blockPoolId,
|
||||
String encryptionAlgorithm, boolean useProto) {
|
||||
this(keyUpdateInterval, tokenLifetime, nnIndex, numNNs,
|
||||
blockPoolId, encryptionAlgorithm, useProto, false);
|
||||
}
|
||||
|
||||
public BlockTokenSecretManager(long keyUpdateInterval,
|
||||
long tokenLifetime, int nnIndex, int numNNs, String blockPoolId,
|
||||
String encryptionAlgorithm, boolean useProto, boolean shouldWrapQOP) {
|
||||
this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
|
||||
encryptionAlgorithm, nnIndex, numNNs, useProto, shouldWrapQOP);
|
||||
Preconditions.checkArgument(nnIndex >= 0);
|
||||
Preconditions.checkArgument(numNNs > 0);
|
||||
setSerialNo(new SecureRandom().nextInt());
|
||||
generateKeys();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -125,21 +147,11 @@ public BlockTokenSecretManager(long keyUpdateInterval,
|
||||
* @param encryptionAlgorithm encryption algorithm to use
|
||||
* @param numNNs number of namenodes possible
|
||||
* @param useProto should we use new protobuf style tokens
|
||||
* @param shouldWrapQOP should wrap QOP in the block access token
|
||||
*/
|
||||
public BlockTokenSecretManager(long keyUpdateInterval,
|
||||
long tokenLifetime, int nnIndex, int numNNs, String blockPoolId,
|
||||
String encryptionAlgorithm, boolean useProto) {
|
||||
this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
|
||||
encryptionAlgorithm, nnIndex, numNNs, useProto);
|
||||
Preconditions.checkArgument(nnIndex >= 0);
|
||||
Preconditions.checkArgument(numNNs > 0);
|
||||
setSerialNo(new SecureRandom().nextInt());
|
||||
generateKeys();
|
||||
}
|
||||
|
||||
private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
|
||||
long tokenLifetime, String blockPoolId, String encryptionAlgorithm,
|
||||
int nnIndex, int numNNs, boolean useProto) {
|
||||
int nnIndex, int numNNs, boolean useProto, boolean shouldWrapQOP) {
|
||||
this.nnIndex = nnIndex;
|
||||
this.isMaster = isMaster;
|
||||
this.keyUpdateInterval = keyUpdateInterval;
|
||||
@ -148,6 +160,7 @@ private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
|
||||
this.blockPoolId = blockPoolId;
|
||||
this.encryptionAlgorithm = encryptionAlgorithm;
|
||||
this.useProto = useProto;
|
||||
this.shouldWrapQOP = shouldWrapQOP;
|
||||
this.timer = new Timer();
|
||||
generateKeys();
|
||||
}
|
||||
@ -277,10 +290,16 @@ public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
|
||||
/** Generate a block token for a specified user */
|
||||
public Token<BlockTokenIdentifier> generateToken(String userId,
|
||||
ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes,
|
||||
StorageType[] storageTypes, String[] storageIds) throws IOException {
|
||||
StorageType[] storageTypes, String[] storageIds) {
|
||||
BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
|
||||
.getBlockPoolId(), block.getBlockId(), modes, storageTypes,
|
||||
storageIds, useProto);
|
||||
if (shouldWrapQOP) {
|
||||
String qop = Server.getEstablishedQOP();
|
||||
if (qop != null) {
|
||||
id.setHandshakeMsg(qop.getBytes(Charsets.UTF_8));
|
||||
}
|
||||
}
|
||||
return new Token<BlockTokenIdentifier>(id, this);
|
||||
}
|
||||
|
||||
@ -543,18 +562,6 @@ public byte[] retrieveDataEncryptionKey(int keyId, byte[] nonce)
|
||||
return createPassword(nonce, key.getKey());
|
||||
}
|
||||
|
||||
/**
|
||||
* Encrypt the given message with the current block key, using the current
|
||||
* block key.
|
||||
*
|
||||
* @param message the message to be encrypted.
|
||||
* @return the secret created by encrypting the given message.
|
||||
*/
|
||||
public byte[] secretGen(byte[] message) {
|
||||
return createPassword(message, currentKey.getKey());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public BlockKey getCurrentKey() {
|
||||
return currentKey;
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
import static org.apache.hadoop.hdfs.protocol.BlockType.CONTIGUOUS;
|
||||
import static org.apache.hadoop.hdfs.protocol.BlockType.STRIPED;
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
@ -625,6 +626,9 @@ private static BlockTokenSecretManager createBlockTokenSecretManager(
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT);
|
||||
|
||||
boolean shouldWrapQOP = conf.getBoolean(
|
||||
DFS_NAMENODE_SEND_QOP_ENABLED, DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT);
|
||||
|
||||
if (isHaEnabled) {
|
||||
// figure out which index we are of the nns
|
||||
Collection<String> nnIds = DFSUtilClient.getNameNodeIds(conf, nsId);
|
||||
@ -638,11 +642,11 @@ private static BlockTokenSecretManager createBlockTokenSecretManager(
|
||||
}
|
||||
return new BlockTokenSecretManager(updateMin * 60 * 1000L,
|
||||
lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null,
|
||||
encryptionAlgorithm, shouldWriteProtobufToken);
|
||||
encryptionAlgorithm, shouldWriteProtobufToken, shouldWrapQOP);
|
||||
} else {
|
||||
return new BlockTokenSecretManager(updateMin*60*1000L,
|
||||
lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm,
|
||||
shouldWriteProtobufToken);
|
||||
shouldWriteProtobufToken, shouldWrapQOP);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,13 +27,10 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
|
||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
@ -147,8 +144,6 @@
|
||||
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
@ -269,8 +264,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||
|
||||
private final String defaultECPolicyName;
|
||||
|
||||
private final boolean shouldSendQOP;
|
||||
|
||||
public NameNodeRpcServer(Configuration conf, NameNode nn)
|
||||
throws IOException {
|
||||
this.nn = nn;
|
||||
@ -553,8 +546,6 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
|
||||
this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
|
||||
}
|
||||
}
|
||||
this.shouldSendQOP = conf.getBoolean(
|
||||
DFS_NAMENODE_SEND_QOP_ENABLED, DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT);
|
||||
}
|
||||
|
||||
/** Allow access to the lifeline RPC server for testing */
|
||||
@ -762,11 +753,6 @@ public LocatedBlocks getBlockLocations(String src,
|
||||
metrics.incrGetBlockLocations();
|
||||
LocatedBlocks locatedBlocks =
|
||||
namesystem.getBlockLocations(getClientMachine(), src, offset, length);
|
||||
if (shouldSendQOP) {
|
||||
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
|
||||
wrapEstablishedQOP(lb, getEstablishedClientQOP());
|
||||
}
|
||||
}
|
||||
return locatedBlocks;
|
||||
}
|
||||
|
||||
@ -841,9 +827,6 @@ public LastBlockWithStatus append(String src, String clientName,
|
||||
RetryCache.setState(cacheEntry, success, info);
|
||||
}
|
||||
metrics.incrFilesAppended();
|
||||
if (shouldSendQOP) {
|
||||
wrapEstablishedQOP(info.getLastBlock(), getEstablishedClientQOP());
|
||||
}
|
||||
return info;
|
||||
}
|
||||
|
||||
@ -912,9 +895,6 @@ public LocatedBlock addBlock(String src, String clientName,
|
||||
if (locatedBlock != null) {
|
||||
metrics.incrAddBlockOps();
|
||||
}
|
||||
if (shouldSendQOP) {
|
||||
wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
|
||||
}
|
||||
return locatedBlock;
|
||||
}
|
||||
|
||||
@ -948,9 +928,6 @@ public LocatedBlock getAdditionalDatanode(final String src,
|
||||
LocatedBlock locatedBlock = namesystem.getAdditionalDatanode(src, fileId,
|
||||
blk, existings, existingStorageIDs, excludeSet, numAdditionalNodes,
|
||||
clientName);
|
||||
if (shouldSendQOP) {
|
||||
wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
|
||||
}
|
||||
return locatedBlock;
|
||||
}
|
||||
/**
|
||||
@ -1877,7 +1854,7 @@ private static String getClientMachine() {
|
||||
*
|
||||
* @return the established QOP of this client.
|
||||
*/
|
||||
private static String getEstablishedClientQOP() {
|
||||
public static String getEstablishedClientQOP() {
|
||||
return Server.getEstablishedQOP();
|
||||
}
|
||||
|
||||
@ -2631,26 +2608,4 @@ public Long getNextSPSPath() throws IOException {
|
||||
}
|
||||
return namesystem.getBlockManager().getSPSManager().getNextPathId();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Wrapping the QOP information into the LocatedBlock instance.
|
||||
* The wrapped QOP will be used by DataNode, i.e. DataNode will simply use
|
||||
* this QOP to accept client calls, because this this QOP is viewed
|
||||
* as the QOP that NameNode has accepted.
|
||||
*
|
||||
* @param locatedBlock the LocatedBlock instance
|
||||
* @param qop the QOP to wrap in
|
||||
* @throws RuntimeException
|
||||
*/
|
||||
private void wrapEstablishedQOP(LocatedBlock locatedBlock, String qop) {
|
||||
if (qop == null || locatedBlock == null) {
|
||||
return;
|
||||
}
|
||||
BlockTokenSecretManager btsm = namesystem.getBlockManager()
|
||||
.getBlockTokenSecretManager();
|
||||
Token<BlockTokenIdentifier> token = locatedBlock.getBlockToken();
|
||||
byte[] secret = btsm.secretGen(qop.getBytes(Charsets.UTF_8));
|
||||
token.setDNHandshakeSecret(secret);
|
||||
}
|
||||
}
|
||||
|
@ -20,7 +20,6 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import javax.crypto.Mac;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
@ -32,7 +31,6 @@
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.security.TestPermission;
|
||||
import org.junit.After;
|
||||
@ -55,7 +53,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
|
||||
|
||||
private HdfsConfiguration conf;
|
||||
private MiniDFSCluster cluster;
|
||||
private String encryptionAlgorithm;
|
||||
private DistributedFileSystem dfs;
|
||||
|
||||
private String configKey;
|
||||
@ -84,7 +81,6 @@ public void setup() throws Exception {
|
||||
conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
|
||||
conf.set(HADOOP_RPC_PROTECTION, this.configKey);
|
||||
cluster = null;
|
||||
encryptionAlgorithm = "HmacSHA1";
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
cluster.waitActive();
|
||||
}
|
||||
@ -109,12 +105,8 @@ public void testAddBlockWrappingQOP() throws Exception {
|
||||
|
||||
LocatedBlock lb = client.namenode.addBlock(src, clientName, null, null,
|
||||
HdfsConstants.GRANDFATHER_INODE_ID, null, null);
|
||||
byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
|
||||
BlockKey currentKey = cluster.getNamesystem().getBlockManager()
|
||||
.getBlockTokenSecretManager().getCurrentKey();
|
||||
String decrypted = decryptMessage(secret, currentKey,
|
||||
encryptionAlgorithm);
|
||||
assertEquals(this.qopValue, decrypted);
|
||||
byte[] secret = lb.getBlockToken().decodeIdentifier().getHandshakeMsg();
|
||||
assertEquals(this.qopValue, new String(secret));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -137,12 +129,8 @@ public void testAppendWrappingQOP() throws Exception {
|
||||
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
|
||||
|
||||
byte[] secret = lastBlock.getLastBlock().getBlockToken()
|
||||
.getDnHandshakeSecret();
|
||||
BlockKey currentKey = cluster.getNamesystem().getBlockManager()
|
||||
.getBlockTokenSecretManager().getCurrentKey();
|
||||
String decrypted = decryptMessage(secret, currentKey,
|
||||
encryptionAlgorithm);
|
||||
assertEquals(this.qopValue, decrypted);
|
||||
.decodeIdentifier().getHandshakeMsg();
|
||||
assertEquals(this.qopValue, new String(secret));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -164,27 +152,10 @@ public void testGetBlockLocationWrappingQOP() throws Exception {
|
||||
|
||||
assertTrue(lbs.getLocatedBlocks().size() > 0);
|
||||
|
||||
BlockKey currentKey = cluster.getNamesystem().getBlockManager()
|
||||
.getBlockTokenSecretManager().getCurrentKey();
|
||||
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
|
||||
byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
|
||||
String decrypted = decryptMessage(secret, currentKey,
|
||||
encryptionAlgorithm);
|
||||
assertEquals(this.qopValue, decrypted);
|
||||
byte[] secret = lb.getBlockToken()
|
||||
.decodeIdentifier().getHandshakeMsg();
|
||||
assertEquals(this.qopValue, new String(secret));
|
||||
}
|
||||
}
|
||||
|
||||
private String decryptMessage(byte[] secret, BlockKey key,
|
||||
String algorithm) throws Exception {
|
||||
String[] qops = {"auth", "auth-conf", "auth-int"};
|
||||
Mac mac = Mac.getInstance(algorithm);
|
||||
mac.init(key.getKey());
|
||||
for (String qop : qops) {
|
||||
byte[] encrypted = mac.doFinal(qop.getBytes());
|
||||
if (Arrays.equals(encrypted, secret)) {
|
||||
return qop;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user