HDFS-13699. Add DFSClient sending handshake token to DataNode, and allow DataNode overwrite downstream QOP. Contributed by Chen Liang.
This commit is contained in:
parent
5379d85d8e
commit
626fec652b
@ -167,7 +167,7 @@ protected SecretKey generateSecret() {
|
||||
* @param key the secret key
|
||||
* @return the bytes of the generated password
|
||||
*/
|
||||
protected static byte[] createPassword(byte[] identifier,
|
||||
public static byte[] createPassword(byte[] identifier,
|
||||
SecretKey key) {
|
||||
Mac mac = threadLocalMac.get();
|
||||
try {
|
||||
|
@ -157,6 +157,9 @@ public interface HdfsClientConfigKeys {
|
||||
String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
|
||||
"dfs.encrypt.data.transfer.cipher.suites";
|
||||
|
||||
String DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY =
|
||||
"dfs.encrypt.data.overwrite.downstream.new.qop";
|
||||
|
||||
String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
|
||||
String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = "";
|
||||
String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY =
|
||||
|
@ -49,6 +49,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.HandshakeSecretProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
import org.apache.hadoop.security.SaslPropertiesResolver;
|
||||
@ -249,6 +250,51 @@ public static byte[] readSaslMessageAndNegotiationCipherOptions(
|
||||
}
|
||||
}
|
||||
|
||||
static class SaslMessageWithHandshake {
|
||||
private final byte[] payload;
|
||||
private final byte[] secret;
|
||||
private final String bpid;
|
||||
|
||||
SaslMessageWithHandshake(byte[] payload, byte[] secret, String bpid) {
|
||||
this.payload = payload;
|
||||
this.secret = secret;
|
||||
this.bpid = bpid;
|
||||
}
|
||||
|
||||
byte[] getPayload() {
|
||||
return payload;
|
||||
}
|
||||
|
||||
byte[] getSecret() {
|
||||
return secret;
|
||||
}
|
||||
|
||||
String getBpid() {
|
||||
return bpid;
|
||||
}
|
||||
}
|
||||
|
||||
public static SaslMessageWithHandshake readSaslMessageWithHandshakeSecret(
|
||||
InputStream in) throws IOException {
|
||||
DataTransferEncryptorMessageProto proto =
|
||||
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
|
||||
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
|
||||
throw new InvalidEncryptionKeyException(proto.getMessage());
|
||||
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
|
||||
throw new IOException(proto.getMessage());
|
||||
} else {
|
||||
byte[] payload = proto.getPayload().toByteArray();
|
||||
byte[] secret = null;
|
||||
String bpid = null;
|
||||
if (proto.hasHandshakeSecret()) {
|
||||
HandshakeSecretProto handshakeSecret = proto.getHandshakeSecret();
|
||||
secret = handshakeSecret.getSecret().toByteArray();
|
||||
bpid = handshakeSecret.getBpid();
|
||||
}
|
||||
return new SaslMessageWithHandshake(payload, secret, bpid);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Negotiate a cipher option which server supports.
|
||||
*
|
||||
@ -375,6 +421,12 @@ public static void sendSaslMessage(OutputStream out, byte[] payload)
|
||||
sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
|
||||
}
|
||||
|
||||
public static void sendSaslMessageHandshakeSecret(OutputStream out,
|
||||
byte[] payload, byte[] secret, String bpid) throws IOException {
|
||||
sendSaslMessageHandshakeSecret(out, DataTransferEncryptorStatus.SUCCESS,
|
||||
payload, null, secret, bpid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a SASL negotiation message and negotiation cipher options to server.
|
||||
*
|
||||
@ -497,6 +549,13 @@ public static CipherOption unwrap(CipherOption option, SaslParticipant sasl)
|
||||
public static void sendSaslMessage(OutputStream out,
|
||||
DataTransferEncryptorStatus status, byte[] payload, String message)
|
||||
throws IOException {
|
||||
sendSaslMessage(out, status, payload, message, null);
|
||||
}
|
||||
|
||||
public static void sendSaslMessage(OutputStream out,
|
||||
DataTransferEncryptorStatus status, byte[] payload, String message,
|
||||
HandshakeSecretProto handshakeSecret)
|
||||
throws IOException {
|
||||
DataTransferEncryptorMessageProto.Builder builder =
|
||||
DataTransferEncryptorMessageProto.newBuilder();
|
||||
|
||||
@ -507,12 +566,25 @@ public static void sendSaslMessage(OutputStream out,
|
||||
if (message != null) {
|
||||
builder.setMessage(message);
|
||||
}
|
||||
if (handshakeSecret != null) {
|
||||
builder.setHandshakeSecret(handshakeSecret);
|
||||
}
|
||||
|
||||
DataTransferEncryptorMessageProto proto = builder.build();
|
||||
proto.writeDelimitedTo(out);
|
||||
out.flush();
|
||||
}
|
||||
|
||||
public static void sendSaslMessageHandshakeSecret(OutputStream out,
|
||||
DataTransferEncryptorStatus status, byte[] payload, String message,
|
||||
byte[] secret, String bpid) throws IOException {
|
||||
HandshakeSecretProto.Builder builder =
|
||||
HandshakeSecretProto.newBuilder();
|
||||
builder.setSecret(ByteString.copyFrom(secret));
|
||||
builder.setBpid(bpid);
|
||||
sendSaslMessage(out, status, payload, message, builder.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* There is no reason to instantiate this class.
|
||||
*/
|
||||
|
@ -18,8 +18,11 @@
|
||||
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY;
|
||||
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
@ -31,6 +34,7 @@
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
import javax.security.auth.callback.Callback;
|
||||
import javax.security.auth.callback.CallbackHandler;
|
||||
import javax.security.auth.callback.NameCallback;
|
||||
@ -39,6 +43,7 @@
|
||||
import javax.security.sasl.RealmCallback;
|
||||
import javax.security.sasl.RealmChoiceCallback;
|
||||
|
||||
import javax.security.sasl.Sasl;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -54,6 +59,7 @@
|
||||
import org.apache.hadoop.security.SaslPropertiesResolver;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -83,6 +89,10 @@ public class SaslDataTransferClient {
|
||||
private final SaslPropertiesResolver saslPropsResolver;
|
||||
private final TrustedChannelResolver trustedChannelResolver;
|
||||
|
||||
// Store the most recent successfully negotiated QOP,
|
||||
// for testing purpose only
|
||||
private String targetQOP;
|
||||
|
||||
/**
|
||||
* Creates a new SaslDataTransferClient. This constructor is used in cases
|
||||
* where it is not relevant to track if a secure client did a fallback to
|
||||
@ -140,7 +150,7 @@ public IOStreamPair newSocketSend(Socket socket, OutputStream underlyingOut,
|
||||
DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ?
|
||||
encryptionKeyFactory.newDataEncryptionKey() : null;
|
||||
IOStreamPair ios = send(socket.getInetAddress(), underlyingOut,
|
||||
underlyingIn, encryptionKey, accessToken, datanodeId);
|
||||
underlyingIn, encryptionKey, accessToken, datanodeId, null);
|
||||
return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
|
||||
}
|
||||
|
||||
@ -180,8 +190,19 @@ public IOStreamPair socketSend(Socket socket, OutputStream underlyingOut,
|
||||
InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
|
||||
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
|
||||
throws IOException {
|
||||
return socketSend(socket, underlyingOut, underlyingIn, encryptionKeyFactory,
|
||||
accessToken, datanodeId, null);
|
||||
}
|
||||
|
||||
public IOStreamPair socketSend(
|
||||
Socket socket, OutputStream underlyingOut, InputStream underlyingIn,
|
||||
DataEncryptionKeyFactory encryptionKeyFactory,
|
||||
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
|
||||
SecretKey secretKey)
|
||||
throws IOException {
|
||||
IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut,
|
||||
underlyingIn, encryptionKeyFactory, accessToken, datanodeId);
|
||||
underlyingIn, encryptionKeyFactory, accessToken, datanodeId,
|
||||
secretKey);
|
||||
return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
|
||||
}
|
||||
|
||||
@ -203,17 +224,26 @@ private IOStreamPair checkTrustAndSend(InetAddress addr,
|
||||
DataEncryptionKeyFactory encryptionKeyFactory,
|
||||
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
|
||||
throws IOException {
|
||||
return checkTrustAndSend(addr, underlyingOut, underlyingIn,
|
||||
encryptionKeyFactory, accessToken, datanodeId, null);
|
||||
}
|
||||
|
||||
private IOStreamPair checkTrustAndSend(
|
||||
InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn,
|
||||
DataEncryptionKeyFactory encryptionKeyFactory,
|
||||
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
|
||||
SecretKey secretKey)
|
||||
throws IOException {
|
||||
boolean localTrusted = trustedChannelResolver.isTrusted();
|
||||
boolean remoteTrusted = trustedChannelResolver.isTrusted(addr);
|
||||
LOG.debug("SASL encryption trust check: localHostTrusted = {}, "
|
||||
LOG.info("SASL encryption trust check: localHostTrusted = {}, "
|
||||
+ "remoteHostTrusted = {}", localTrusted, remoteTrusted);
|
||||
|
||||
if (!localTrusted || !remoteTrusted) {
|
||||
// The encryption key factory only returns a key if encryption is enabled.
|
||||
DataEncryptionKey encryptionKey = encryptionKeyFactory
|
||||
.newDataEncryptionKey();
|
||||
DataEncryptionKey encryptionKey =
|
||||
encryptionKeyFactory.newDataEncryptionKey();
|
||||
return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken,
|
||||
datanodeId);
|
||||
datanodeId, secretKey);
|
||||
} else {
|
||||
LOG.debug(
|
||||
"SASL client skipping handshake on trusted connection for addr = {}, "
|
||||
@ -237,13 +267,14 @@ private IOStreamPair checkTrustAndSend(InetAddress addr,
|
||||
*/
|
||||
private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
|
||||
InputStream underlyingIn, DataEncryptionKey encryptionKey,
|
||||
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
|
||||
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
|
||||
SecretKey secretKey)
|
||||
throws IOException {
|
||||
if (encryptionKey != null) {
|
||||
LOG.debug("SASL client doing encrypted handshake for addr = {}, "
|
||||
+ "datanodeId = {}", addr, datanodeId);
|
||||
return getEncryptedStreams(addr, underlyingOut, underlyingIn,
|
||||
encryptionKey);
|
||||
encryptionKey, accessToken, secretKey);
|
||||
} else if (!UserGroupInformation.isSecurityEnabled()) {
|
||||
LOG.debug("SASL client skipping handshake in unsecured configuration for "
|
||||
+ "addr = {}, datanodeId = {}", addr, datanodeId);
|
||||
@ -264,7 +295,8 @@ private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
|
||||
LOG.debug(
|
||||
"SASL client doing general handshake for addr = {}, datanodeId = {}",
|
||||
addr, datanodeId);
|
||||
return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken);
|
||||
return getSaslStreams(addr, underlyingOut, underlyingIn,
|
||||
accessToken, secretKey);
|
||||
} else {
|
||||
// It's a secured cluster using non-privileged ports, but no SASL. The
|
||||
// only way this can happen is if the DataNode has
|
||||
@ -287,11 +319,20 @@ private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
|
||||
* @throws IOException for any error
|
||||
*/
|
||||
private IOStreamPair getEncryptedStreams(InetAddress addr,
|
||||
OutputStream underlyingOut,
|
||||
InputStream underlyingIn, DataEncryptionKey encryptionKey)
|
||||
OutputStream underlyingOut, InputStream underlyingIn,
|
||||
DataEncryptionKey encryptionKey,
|
||||
Token<BlockTokenIdentifier> accessToken,
|
||||
SecretKey secretKey)
|
||||
throws IOException {
|
||||
Map<String, String> saslProps = createSaslPropertiesForEncryption(
|
||||
encryptionKey.encryptionAlgorithm);
|
||||
if (secretKey != null) {
|
||||
LOG.debug("DataNode overwriting downstream QOP" +
|
||||
saslProps.get(Sasl.QOP));
|
||||
byte[] newSecret = SecretManager.createPassword(saslProps.get(Sasl.QOP)
|
||||
.getBytes(Charsets.UTF_8), secretKey);
|
||||
accessToken.setDNHandshakeSecret(newSecret);
|
||||
}
|
||||
|
||||
LOG.debug("Client using encryption algorithm {}",
|
||||
encryptionKey.encryptionAlgorithm);
|
||||
@ -301,7 +342,7 @@ private IOStreamPair getEncryptedStreams(InetAddress addr,
|
||||
CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
|
||||
password);
|
||||
return doSaslHandshake(addr, underlyingOut, underlyingIn, userName,
|
||||
saslProps, callbackHandler);
|
||||
saslProps, callbackHandler, accessToken);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -370,6 +411,11 @@ public void handle(Callback[] callbacks) throws IOException,
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getTargetQOP() {
|
||||
return targetQOP;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends client SASL negotiation for general-purpose handshake.
|
||||
*
|
||||
@ -382,16 +428,36 @@ public void handle(Callback[] callbacks) throws IOException,
|
||||
*/
|
||||
private IOStreamPair getSaslStreams(InetAddress addr,
|
||||
OutputStream underlyingOut, InputStream underlyingIn,
|
||||
Token<BlockTokenIdentifier> accessToken)
|
||||
Token<BlockTokenIdentifier> accessToken,
|
||||
SecretKey secretKey)
|
||||
throws IOException {
|
||||
Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr);
|
||||
|
||||
// secretKey != null only happens when this is called by DN
|
||||
// sending to downstream DN. If called from client, this will be null,
|
||||
// as there is no key for client to generate mac instance.
|
||||
// So that, if a different QOP is desired for inter-DN communication,
|
||||
// the check below will use new QOP to create a secret, which includes
|
||||
// the new QOP.
|
||||
if (secretKey != null) {
|
||||
String newQOP = conf
|
||||
.get(DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY);
|
||||
if (newQOP != null) {
|
||||
saslProps.put(Sasl.QOP, newQOP);
|
||||
}
|
||||
LOG.debug("DataNode overwriting downstream QOP " +
|
||||
saslProps.get(Sasl.QOP));
|
||||
byte[] newSecret = SecretManager.createPassword(
|
||||
saslProps.get(Sasl.QOP).getBytes(Charsets.UTF_8), secretKey);
|
||||
accessToken.setDNHandshakeSecret(newSecret);
|
||||
}
|
||||
targetQOP = saslProps.get(Sasl.QOP);
|
||||
String userName = buildUserName(accessToken);
|
||||
char[] password = buildClientPassword(accessToken);
|
||||
CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
|
||||
password);
|
||||
return doSaslHandshake(addr, underlyingOut, underlyingIn, userName,
|
||||
saslProps, callbackHandler);
|
||||
saslProps, callbackHandler, accessToken);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -435,8 +501,8 @@ private char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
|
||||
*/
|
||||
private IOStreamPair doSaslHandshake(InetAddress addr,
|
||||
OutputStream underlyingOut, InputStream underlyingIn, String userName,
|
||||
Map<String, String> saslProps,
|
||||
CallbackHandler callbackHandler) throws IOException {
|
||||
Map<String, String> saslProps, CallbackHandler callbackHandler,
|
||||
Token<BlockTokenIdentifier> accessToken) throws IOException {
|
||||
|
||||
DataOutputStream out = new DataOutputStream(underlyingOut);
|
||||
DataInputStream in = new DataInputStream(underlyingIn);
|
||||
@ -449,7 +515,22 @@ private IOStreamPair doSaslHandshake(InetAddress addr,
|
||||
|
||||
try {
|
||||
// Start of handshake - "initial response" in SASL terminology.
|
||||
sendSaslMessage(out, new byte[0]);
|
||||
// The handshake secret can be null, this happens when client is running
|
||||
// a new version but the cluster does not have this feature. In which case
|
||||
// there will be no encrypted secret sent from NN.
|
||||
byte[] handshakeSecret = accessToken.getDnHandshakeSecret();
|
||||
if (handshakeSecret == null || handshakeSecret.length == 0) {
|
||||
LOG.debug("Handshake secret is null, sending without "
|
||||
+ "handshake secret.");
|
||||
sendSaslMessage(out, new byte[0]);
|
||||
} else {
|
||||
LOG.debug("Sending handshake secret.");
|
||||
BlockTokenIdentifier identifier = new BlockTokenIdentifier();
|
||||
identifier.readFields(new DataInputStream(
|
||||
new ByteArrayInputStream(accessToken.getIdentifier())));
|
||||
String bpid = identifier.getBlockPoolId();
|
||||
sendSaslMessageHandshakeSecret(out, new byte[0], handshakeSecret, bpid);
|
||||
}
|
||||
|
||||
// step 1
|
||||
byte[] remoteResponse = readSaslMessage(in);
|
||||
|
@ -43,6 +43,12 @@ message DataTransferEncryptorMessageProto {
|
||||
optional bytes payload = 2;
|
||||
optional string message = 3;
|
||||
repeated CipherOptionProto cipherOption = 4;
|
||||
optional HandshakeSecretProto handshakeSecret = 5;
|
||||
}
|
||||
|
||||
message HandshakeSecretProto {
|
||||
required bytes secret = 1;
|
||||
required string bpid = 2;
|
||||
}
|
||||
|
||||
message BaseHeaderProto {
|
||||
|
@ -1027,6 +1027,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
|
||||
// Security-related configs
|
||||
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
|
||||
public static final String DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY =
|
||||
"dfs.encrypt.data.overwrite.downstream.derived.qop";
|
||||
public static final boolean DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT =
|
||||
false;
|
||||
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
|
||||
public static final String DFS_XFRAME_OPTION_ENABLED = "dfs.xframe.enabled";
|
||||
public static final boolean DFS_XFRAME_OPTION_ENABLED_DEFAULT = true;
|
||||
|
@ -21,15 +21,18 @@
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
|
||||
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
import javax.security.auth.callback.Callback;
|
||||
import javax.security.auth.callback.CallbackHandler;
|
||||
import javax.security.auth.callback.NameCallback;
|
||||
@ -37,6 +40,7 @@
|
||||
import javax.security.auth.callback.UnsupportedCallbackException;
|
||||
import javax.security.sasl.AuthorizeCallback;
|
||||
import javax.security.sasl.RealmCallback;
|
||||
import javax.security.sasl.Sasl;
|
||||
import javax.security.sasl.SaslException;
|
||||
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
@ -48,12 +52,15 @@
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DNConf;
|
||||
import org.apache.hadoop.security.SaslPropertiesResolver;
|
||||
import org.apache.hadoop.security.SaslRpcServer;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -78,6 +85,10 @@ public class SaslDataTransferServer {
|
||||
private final BlockPoolTokenSecretManager blockPoolTokenSecretManager;
|
||||
private final DNConf dnConf;
|
||||
|
||||
// Store the most recent successfully negotiated QOP,
|
||||
// for testing purpose only
|
||||
private String negotiatedQOP;
|
||||
|
||||
/**
|
||||
* Creates a new SaslDataTransferServer.
|
||||
*
|
||||
@ -337,6 +348,26 @@ private BlockTokenIdentifier deserializeIdentifier(String str)
|
||||
return identifier;
|
||||
}
|
||||
|
||||
private String examineSecret(byte[] secret, String bpid) {
|
||||
BlockKey blockKey = blockPoolTokenSecretManager.get(bpid).getCurrentKey();
|
||||
SecretKey secretKey = blockKey.getKey();
|
||||
for (SaslRpcServer.QualityOfProtection qop :
|
||||
SaslRpcServer.QualityOfProtection.values()) {
|
||||
String qopString = qop.getSaslQop();
|
||||
byte[] data = qopString.getBytes(Charsets.UTF_8);
|
||||
byte[] encryptedData = SecretManager.createPassword(data, secretKey);
|
||||
if (Arrays.equals(encryptedData, secret)) {
|
||||
return qopString;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getNegotiatedQOP() {
|
||||
return negotiatedQOP;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method actually executes the server-side SASL handshake.
|
||||
*
|
||||
@ -355,9 +386,6 @@ private IOStreamPair doSaslHandshake(Peer peer, OutputStream underlyingOut,
|
||||
DataInputStream in = new DataInputStream(underlyingIn);
|
||||
DataOutputStream out = new DataOutputStream(underlyingOut);
|
||||
|
||||
SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(saslProps,
|
||||
callbackHandler);
|
||||
|
||||
int magicNumber = in.readInt();
|
||||
if (magicNumber != SASL_TRANSFER_MAGIC_NUMBER) {
|
||||
throw new InvalidMagicNumberException(magicNumber,
|
||||
@ -365,7 +393,23 @@ private IOStreamPair doSaslHandshake(Peer peer, OutputStream underlyingOut,
|
||||
}
|
||||
try {
|
||||
// step 1
|
||||
byte[] remoteResponse = readSaslMessage(in);
|
||||
SaslMessageWithHandshake message = readSaslMessageWithHandshakeSecret(in);
|
||||
byte[] secret = message.getSecret();
|
||||
String bpid = message.getBpid();
|
||||
if (secret != null || bpid != null) {
|
||||
// sanity check, if one is null, the other must also not be null
|
||||
assert(secret != null && bpid != null);
|
||||
String qop = examineSecret(secret, bpid);
|
||||
if (qop != null) {
|
||||
saslProps.put(Sasl.QOP, qop);
|
||||
} else {
|
||||
LOG.error("Unable to match secret to a QOP!");
|
||||
}
|
||||
}
|
||||
SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(
|
||||
saslProps, callbackHandler);
|
||||
|
||||
byte[] remoteResponse = message.getPayload();
|
||||
byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
|
||||
sendSaslMessage(out, localResponse);
|
||||
|
||||
@ -379,6 +423,7 @@ private IOStreamPair doSaslHandshake(Peer peer, OutputStream underlyingOut,
|
||||
checkSaslComplete(sasl, saslProps);
|
||||
|
||||
CipherOption cipherOption = null;
|
||||
negotiatedQOP = sasl.getNegotiatedQop();
|
||||
if (sasl.isNegotiatedQopPrivacy()) {
|
||||
// Negotiate a cipher option
|
||||
Configuration conf = dnConf.getConf();
|
||||
|
@ -37,6 +37,8 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
||||
@ -95,6 +97,7 @@ public class DNConf {
|
||||
final boolean syncOnClose;
|
||||
final boolean encryptDataTransfer;
|
||||
final boolean connectToDnViaHostname;
|
||||
final boolean overwriteDownstreamDerivedQOP;
|
||||
|
||||
final long readaheadLength;
|
||||
final long heartBeatInterval;
|
||||
@ -248,6 +251,9 @@ public DNConf(final Configurable dn) {
|
||||
this.encryptDataTransfer = getConf().getBoolean(
|
||||
DFS_ENCRYPT_DATA_TRANSFER_KEY,
|
||||
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
|
||||
this.overwriteDownstreamDerivedQOP = getConf().getBoolean(
|
||||
DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY,
|
||||
DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT);
|
||||
this.encryptionAlgorithm = getConf().get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
|
||||
this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConf());
|
||||
this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
|
||||
|
@ -1796,7 +1796,12 @@ public DataXceiverServer getXferServer() {
|
||||
public int getXferPort() {
|
||||
return streamingAddr.getPort();
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
public SaslDataTransferServer getSaslServer() {
|
||||
return saslServer;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return name useful for logging
|
||||
*/
|
||||
|
@ -20,6 +20,7 @@
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.ByteString;
|
||||
import javax.crypto.SecretKey;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
@ -47,6 +48,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.AbstractBlockChecksumComputer;
|
||||
@ -798,8 +800,16 @@ public void writeBlock(final ExtendedBlock block,
|
||||
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
|
||||
DataEncryptionKeyFactory keyFactory =
|
||||
datanode.getDataEncryptionKeyFactoryForBlock(block);
|
||||
IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
|
||||
unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
|
||||
SecretKey secretKey = null;
|
||||
if (dnConf.overwriteDownstreamDerivedQOP) {
|
||||
String bpid = block.getBlockPoolId();
|
||||
BlockKey blockKey = datanode.blockPoolTokenSecretManager
|
||||
.get(bpid).getCurrentKey();
|
||||
secretKey = blockKey.getKey();
|
||||
}
|
||||
IOStreamPair saslStreams = datanode.saslClient.socketSend(
|
||||
mirrorSock, unbufMirrorOut, unbufMirrorIn, keyFactory,
|
||||
blockToken, targets[0], secretKey);
|
||||
unbufMirrorOut = saslStreams.out;
|
||||
unbufMirrorIn = saslStreams.in;
|
||||
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
|
||||
|
@ -5351,4 +5351,26 @@
|
||||
will use exactly the same QOP NameNode and client has already agreed on.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.encrypt.data.overwrite.downstream.derived.qop</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
A boolean specifies whether DN should overwrite the downstream
|
||||
QOP in a write pipeline. This is used in the case where client
|
||||
talks to first DN with a QOP, but inter-DN communication needs to be
|
||||
using a different QOP. If set to false, the default behaviour is that
|
||||
inter-DN communication will use the same QOP as client-DN connection.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.encrypt.data.overwrite.downstream.new.qop</name>
|
||||
<value></value>
|
||||
<description>
|
||||
When dfs.datanode.overwrite.downstream.derived.qop is set to true,
|
||||
this configuration specifies the new QOP to be used to overwrite
|
||||
inter-DN QOP.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
@ -37,7 +37,7 @@
|
||||
*/
|
||||
public class TestHAAuxiliaryPort {
|
||||
@Test
|
||||
public void testTest() throws Exception {
|
||||
public void testHAAuxiliaryPort() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0");
|
||||
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1",
|
||||
|
@ -0,0 +1,219 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
|
||||
/**
|
||||
* This test tests access NameNode on different port with different
|
||||
* configured QOP.
|
||||
*/
|
||||
public class TestMultipleNNPortQOP extends SaslDataTransferTestCase {
|
||||
|
||||
private static final Path PATH1 = new Path("/file1");
|
||||
private static final Path PATH2 = new Path("/file2");
|
||||
private static final Path PATH3 = new Path("/file3");
|
||||
private static final int BLOCK_SIZE = 4096;
|
||||
private static final int NUM_BLOCKS = 3;
|
||||
|
||||
private static HdfsConfiguration clusterConf;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
clusterConf = createSecureConfig(
|
||||
"authentication,integrity,privacy");
|
||||
clusterConf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY,
|
||||
"12000,12100,12200");
|
||||
// explicitly setting service rpc for datanode. This because
|
||||
// DFSUtil.getNNServiceRpcAddressesForCluster looks up client facing port
|
||||
// and service port at the same time, and if no setting for service
|
||||
// rpc, it would return client port, in this case, it will be the
|
||||
// auxiliary port for data node. Which is not what auxiliary is for.
|
||||
// setting service rpc port to avoid this.
|
||||
clusterConf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:9020");
|
||||
clusterConf.set(
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
|
||||
"org.apache.hadoop.security.IngressPortBasedResolver");
|
||||
clusterConf.set("ingress.port.sasl.configured.ports", "12000,12100,12200");
|
||||
clusterConf.set("ingress.port.sasl.prop.12000", "authentication");
|
||||
clusterConf.set("ingress.port.sasl.prop.12100", "integrity");
|
||||
clusterConf.set("ingress.port.sasl.prop.12200", "privacy");
|
||||
clusterConf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test accessing NameNode from three different ports.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testMultipleNNPort() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(clusterConf)
|
||||
.numDataNodes(3).build();
|
||||
|
||||
cluster.waitActive();
|
||||
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
|
||||
clientConf.unset(
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
|
||||
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
|
||||
|
||||
URI currentURI = cluster.getURI();
|
||||
URI uriAuthPort = new URI(currentURI.getScheme() +
|
||||
"://" + currentURI.getHost() + ":12000");
|
||||
URI uriIntegrityPort = new URI(currentURI.getScheme() +
|
||||
"://" + currentURI.getHost() + ":12100");
|
||||
URI uriPrivacyPort = new URI(currentURI.getScheme() +
|
||||
"://" + currentURI.getHost() + ":12200");
|
||||
|
||||
clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
|
||||
FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
|
||||
doTest(fsPrivacy, PATH1);
|
||||
for (DataNode dn : dataNodes) {
|
||||
SaslDataTransferServer saslServer = dn.getSaslServer();
|
||||
assertEquals("auth-conf", saslServer.getNegotiatedQOP());
|
||||
}
|
||||
|
||||
clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
|
||||
FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
|
||||
doTest(fsIntegrity, PATH2);
|
||||
for (DataNode dn : dataNodes) {
|
||||
SaslDataTransferServer saslServer = dn.getSaslServer();
|
||||
assertEquals("auth-int", saslServer.getNegotiatedQOP());
|
||||
}
|
||||
|
||||
clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
|
||||
FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
|
||||
doTest(fsAuth, PATH3);
|
||||
for (DataNode dn : dataNodes) {
|
||||
SaslDataTransferServer saslServer = dn.getSaslServer();
|
||||
assertEquals("auth", saslServer.getNegotiatedQOP());
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test accessing NameNode from three different ports, tests
|
||||
* overwriting downstream DN in the pipeline.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testMultipleNNPortOverwriteDownStream() throws Exception {
|
||||
clusterConf.set(DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY, "auth");
|
||||
clusterConf.setBoolean(
|
||||
DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY, true);
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
cluster =
|
||||
new MiniDFSCluster.Builder(clusterConf).numDataNodes(3).build();
|
||||
cluster.waitActive();
|
||||
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
|
||||
clientConf.unset(
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
|
||||
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
|
||||
|
||||
URI currentURI = cluster.getURI();
|
||||
URI uriAuthPort =
|
||||
new URI(currentURI.getScheme() + "://" +
|
||||
currentURI.getHost() + ":12000");
|
||||
URI uriIntegrityPort =
|
||||
new URI(currentURI.getScheme() + "://" +
|
||||
currentURI.getHost() + ":12100");
|
||||
URI uriPrivacyPort =
|
||||
new URI(currentURI.getScheme() + "://" +
|
||||
currentURI.getHost() + ":12200");
|
||||
|
||||
clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
|
||||
FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
|
||||
doTest(fsPrivacy, PATH1);
|
||||
// add a wait so that data has reached not only first DN,
|
||||
// but also the rest
|
||||
Thread.sleep(100);
|
||||
for (int i = 0; i < 2; i++) {
|
||||
DataNode dn = dataNodes.get(i);
|
||||
SaslDataTransferClient saslClient = dn.getSaslClient();
|
||||
assertEquals("auth", saslClient.getTargetQOP());
|
||||
}
|
||||
|
||||
clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
|
||||
FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
|
||||
doTest(fsIntegrity, PATH2);
|
||||
Thread.sleep(100);
|
||||
for (int i = 0; i < 2; i++) {
|
||||
DataNode dn = dataNodes.get(i);
|
||||
SaslDataTransferClient saslClient = dn.getSaslClient();
|
||||
assertEquals("auth", saslClient.getTargetQOP());
|
||||
}
|
||||
|
||||
clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
|
||||
FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
|
||||
doTest(fsAuth, PATH3);
|
||||
Thread.sleep(100);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
DataNode dn = dataNodes.get(i);
|
||||
SaslDataTransferServer saslServer = dn.getSaslServer();
|
||||
assertEquals("auth", saslServer.getNegotiatedQOP());
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doTest(FileSystem fs, Path path) throws Exception {
|
||||
FileSystemTestHelper.createFile(fs, path, NUM_BLOCKS, BLOCK_SIZE);
|
||||
assertArrayEquals(FileSystemTestHelper.getFileData(NUM_BLOCKS, BLOCK_SIZE),
|
||||
DFSTestUtil.readFile(fs, path).getBytes("UTF-8"));
|
||||
BlockLocation[] blockLocations = fs.getFileBlockLocations(path, 0,
|
||||
Long.MAX_VALUE);
|
||||
assertNotNull(blockLocations);
|
||||
assertEquals(NUM_BLOCKS, blockLocations.length);
|
||||
for (BlockLocation blockLocation: blockLocations) {
|
||||
assertNotNull(blockLocation.getHosts());
|
||||
assertEquals(3, blockLocation.getHosts().length);
|
||||
}
|
||||
}
|
||||
}
|
@ -92,7 +92,7 @@ public NullDataNode(Configuration conf, OutputStream out, int port) throws
|
||||
doReturn(pair).when(saslClient).socketSend(
|
||||
any(Socket.class), any(OutputStream.class), any(InputStream.class),
|
||||
any(DataEncryptionKeyFactory.class), any(),
|
||||
any(DatanodeID.class));
|
||||
any(DatanodeID.class), any());
|
||||
doReturn(mock(ReplicaHandler.class)).when(data).createTemporary(
|
||||
any(StorageType.class), any(String.class), any(ExtendedBlock.class),
|
||||
anyBoolean());
|
||||
|
Loading…
Reference in New Issue
Block a user