HDFS-9002. Move o.a.h.hdfs.net/*Peer classes to hdfs-client. Contributed by Mingliang Liu.
This commit is contained in:
parent
c2d2c1802a
commit
ed78b14ebc
@ -14,6 +14,7 @@
|
||||
<Class name="org.apache.hadoop.hdfs.protocol.SnapshotDiffReport$DiffReportEntry"/>
|
||||
<Class name="org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus"/>
|
||||
<Class name="org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport"/>
|
||||
<Class name="org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslResponseWithNegotiatedCipherOption"/>
|
||||
</Or>
|
||||
<Bug pattern="EI_EXPOSE_REP,EI_EXPOSE_REP2" />
|
||||
</Match>
|
||||
|
@ -27,16 +27,24 @@
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.net.BasicInetPeer;
|
||||
import org.apache.hadoop.hdfs.net.NioInetPeer;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
|
||||
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.util.IOUtilsClient;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -46,8 +54,10 @@
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
@ -523,4 +533,58 @@ public static KeyProvider createKeyProvider(
|
||||
}
|
||||
return keyProvider;
|
||||
}
|
||||
|
||||
public static Peer peerFromSocket(Socket socket)
|
||||
throws IOException {
|
||||
Peer peer = null;
|
||||
boolean success = false;
|
||||
try {
|
||||
// TCP_NODELAY is crucial here because of bad interactions between
|
||||
// Nagle's Algorithm and Delayed ACKs. With connection keepalive
|
||||
// between the client and DN, the conversation looks like:
|
||||
// 1. Client -> DN: Read block X
|
||||
// 2. DN -> Client: data for block X
|
||||
// 3. Client -> DN: Status OK (successful read)
|
||||
// 4. Client -> DN: Read block Y
|
||||
// The fact that step #3 and #4 are both in the client->DN direction
|
||||
// triggers Nagling. If the DN is using delayed ACKs, this results
|
||||
// in a delay of 40ms or more.
|
||||
//
|
||||
// TCP_NODELAY disables nagling and thus avoids this performance
|
||||
// disaster.
|
||||
socket.setTcpNoDelay(true);
|
||||
SocketChannel channel = socket.getChannel();
|
||||
if (channel == null) {
|
||||
peer = new BasicInetPeer(socket);
|
||||
} else {
|
||||
peer = new NioInetPeer(socket);
|
||||
}
|
||||
success = true;
|
||||
return peer;
|
||||
} finally {
|
||||
if (!success) {
|
||||
if (peer != null) peer.close();
|
||||
socket.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static Peer peerFromSocketAndKey(
|
||||
SaslDataTransferClient saslClient, Socket s,
|
||||
DataEncryptionKeyFactory keyFactory,
|
||||
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
|
||||
throws IOException {
|
||||
Peer peer = null;
|
||||
boolean success = false;
|
||||
try {
|
||||
peer = peerFromSocket(s);
|
||||
peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
|
||||
success = true;
|
||||
return peer;
|
||||
} finally {
|
||||
if (!success) {
|
||||
IOUtilsClient.cleanup(null, peer);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -126,6 +126,20 @@ public interface HdfsClientConfigKeys {
|
||||
long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
|
||||
String DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri";
|
||||
|
||||
String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
|
||||
"dfs.encrypt.data.transfer.cipher.suites";
|
||||
|
||||
String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
|
||||
String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = "";
|
||||
String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY =
|
||||
"dfs.data.transfer.saslproperties.resolver.class";
|
||||
|
||||
String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY =
|
||||
"dfs.encrypt.data.transfer.cipher.key.bitlength";
|
||||
int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128;
|
||||
|
||||
String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
|
||||
|
||||
String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY =
|
||||
PREFIX + "replica.accessor.builder.classes";
|
||||
|
||||
|
@ -30,7 +30,7 @@
|
||||
* that has no associated Channel.
|
||||
*
|
||||
*/
|
||||
class BasicInetPeer implements Peer {
|
||||
public class BasicInetPeer implements Peer {
|
||||
private final Socket socket;
|
||||
private final OutputStream out;
|
||||
private final InputStream in;
|
@ -31,7 +31,7 @@
|
||||
* Represents a peer that we communicate with by using non-blocking I/O
|
||||
* on a Socket.
|
||||
*/
|
||||
class NioInetPeer implements Peer {
|
||||
public class NioInetPeer implements Peer {
|
||||
private final Socket socket;
|
||||
|
||||
/**
|
||||
@ -46,7 +46,7 @@ class NioInetPeer implements Peer {
|
||||
|
||||
private final boolean isLocal;
|
||||
|
||||
NioInetPeer(Socket socket) throws IOException {
|
||||
public NioInetPeer(Socket socket) throws IOException {
|
||||
this.socket = socket;
|
||||
this.in = new SocketInputStream(socket.getChannel(), 0);
|
||||
this.out = new SocketOutputStream(socket.getChannel(), 0);
|
@ -57,7 +57,7 @@ public interface Peer extends Closeable {
|
||||
* Set the write timeout on this peer.
|
||||
*
|
||||
* Note: this is not honored for BasicInetPeer.
|
||||
* See {@link BasicSocketPeer#setWriteTimeout} for details.
|
||||
* See {@link BasicInetPeer#setWriteTimeout} for details.
|
||||
*
|
||||
* @param timeoutMs The timeout in milliseconds.
|
||||
*/
|
||||
|
@ -21,7 +21,7 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
@ -45,7 +45,7 @@ public class TrustedChannelResolver implements Configurable {
|
||||
public static TrustedChannelResolver getInstance(Configuration conf) {
|
||||
Class<? extends TrustedChannelResolver> clazz =
|
||||
conf.getClass(
|
||||
DFSConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS,
|
||||
HdfsClientConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS,
|
||||
TrustedChannelResolver.class, TrustedChannelResolver.class);
|
||||
return ReflectionUtils.newInstance(clazz, conf);
|
||||
}
|
@ -19,11 +19,11 @@
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
|
||||
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -50,7 +50,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
import org.apache.hadoop.security.SaslPropertiesResolver;
|
||||
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
|
||||
import org.slf4j.Logger;
|
||||
@ -240,7 +240,7 @@ public static byte[] readSaslMessageAndNegotiationCipherOptions(
|
||||
List<CipherOptionProto> optionProtos = proto.getCipherOptionList();
|
||||
if (optionProtos != null) {
|
||||
for (CipherOptionProto optionProto : optionProtos) {
|
||||
cipherOptions.add(PBHelper.convert(optionProto));
|
||||
cipherOptions.add(PBHelperClient.convert(optionProto));
|
||||
}
|
||||
}
|
||||
return proto.getPayload().toByteArray();
|
||||
@ -309,7 +309,7 @@ public static void sendSaslMessageAndNegotiatedCipherOption(
|
||||
builder.setPayload(ByteString.copyFrom(payload));
|
||||
}
|
||||
if (option != null) {
|
||||
builder.addCipherOption(PBHelper.convert(option));
|
||||
builder.addCipherOption(PBHelperClient.convert(option));
|
||||
}
|
||||
|
||||
DataTransferEncryptorMessageProto proto = builder.build();
|
||||
@ -392,7 +392,7 @@ public static void sendSaslMessageAndNegotiationCipherOptions(
|
||||
builder.setPayload(ByteString.copyFrom(payload));
|
||||
}
|
||||
if (options != null) {
|
||||
builder.addAllCipherOption(PBHelper.convertCipherOptions(options));
|
||||
builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
|
||||
}
|
||||
|
||||
DataTransferEncryptorMessageProto proto = builder.build();
|
||||
@ -419,7 +419,7 @@ public static void sendSaslMessageAndNegotiationCipherOptions(
|
||||
throw new IOException(proto.getMessage());
|
||||
} else {
|
||||
byte[] response = proto.getPayload().toByteArray();
|
||||
List<CipherOption> options = PBHelper.convertCipherOptionProtos(
|
||||
List<CipherOption> options = PBHelperClient.convertCipherOptionProtos(
|
||||
proto.getCipherOptionList());
|
||||
CipherOption option = null;
|
||||
if (options != null && !options.isEmpty()) {
|
@ -17,7 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
|
||||
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
|
||||
|
||||
import java.io.DataInputStream;
|
@ -20,6 +20,8 @@
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.CodedInputStream;
|
||||
import org.apache.hadoop.crypto.CipherOption;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
@ -264,4 +266,104 @@ public static InputStream vintPrefixed(final InputStream input)
|
||||
assert size >= 0;
|
||||
return new ExactSizeInputStream(input, size);
|
||||
}
|
||||
|
||||
public static CipherOption convert(HdfsProtos.CipherOptionProto proto) {
|
||||
if (proto != null) {
|
||||
CipherSuite suite = null;
|
||||
if (proto.getSuite() != null) {
|
||||
suite = convert(proto.getSuite());
|
||||
}
|
||||
byte[] inKey = null;
|
||||
if (proto.getInKey() != null) {
|
||||
inKey = proto.getInKey().toByteArray();
|
||||
}
|
||||
byte[] inIv = null;
|
||||
if (proto.getInIv() != null) {
|
||||
inIv = proto.getInIv().toByteArray();
|
||||
}
|
||||
byte[] outKey = null;
|
||||
if (proto.getOutKey() != null) {
|
||||
outKey = proto.getOutKey().toByteArray();
|
||||
}
|
||||
byte[] outIv = null;
|
||||
if (proto.getOutIv() != null) {
|
||||
outIv = proto.getOutIv().toByteArray();
|
||||
}
|
||||
return new CipherOption(suite, inKey, inIv, outKey, outIv);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static CipherSuite convert(HdfsProtos.CipherSuiteProto proto) {
|
||||
switch (proto) {
|
||||
case AES_CTR_NOPADDING:
|
||||
return CipherSuite.AES_CTR_NOPADDING;
|
||||
default:
|
||||
// Set to UNKNOWN and stash the unknown enum value
|
||||
CipherSuite suite = CipherSuite.UNKNOWN;
|
||||
suite.setUnknownValue(proto.getNumber());
|
||||
return suite;
|
||||
}
|
||||
}
|
||||
|
||||
public static HdfsProtos.CipherOptionProto convert(CipherOption option) {
|
||||
if (option != null) {
|
||||
HdfsProtos.CipherOptionProto.Builder builder = HdfsProtos.CipherOptionProto.
|
||||
newBuilder();
|
||||
if (option.getCipherSuite() != null) {
|
||||
builder.setSuite(convert(option.getCipherSuite()));
|
||||
}
|
||||
if (option.getInKey() != null) {
|
||||
builder.setInKey(ByteString.copyFrom(option.getInKey()));
|
||||
}
|
||||
if (option.getInIv() != null) {
|
||||
builder.setInIv(ByteString.copyFrom(option.getInIv()));
|
||||
}
|
||||
if (option.getOutKey() != null) {
|
||||
builder.setOutKey(ByteString.copyFrom(option.getOutKey()));
|
||||
}
|
||||
if (option.getOutIv() != null) {
|
||||
builder.setOutIv(ByteString.copyFrom(option.getOutIv()));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static HdfsProtos.CipherSuiteProto convert(CipherSuite suite) {
|
||||
switch (suite) {
|
||||
case UNKNOWN:
|
||||
return HdfsProtos.CipherSuiteProto.UNKNOWN;
|
||||
case AES_CTR_NOPADDING:
|
||||
return HdfsProtos.CipherSuiteProto.AES_CTR_NOPADDING;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static List<HdfsProtos.CipherOptionProto> convertCipherOptions(
|
||||
List<CipherOption> options) {
|
||||
if (options != null) {
|
||||
List<HdfsProtos.CipherOptionProto> protos =
|
||||
Lists.newArrayListWithCapacity(options.size());
|
||||
for (CipherOption option : options) {
|
||||
protos.add(convert(option));
|
||||
}
|
||||
return protos;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static List<CipherOption> convertCipherOptionProtos(
|
||||
List<HdfsProtos.CipherOptionProto> protos) {
|
||||
if (protos != null) {
|
||||
List<CipherOption> options =
|
||||
Lists.newArrayListWithCapacity(protos.size());
|
||||
for (HdfsProtos.CipherOptionProto proto : protos) {
|
||||
options.add(convert(proto));
|
||||
}
|
||||
return options;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -883,6 +883,9 @@ Release 2.8.0 - UNRELEASED
|
||||
HDFS-8890. Allow admin to specify which blockpools the balancer should run
|
||||
on. (Chris Trezzo via mingma)
|
||||
|
||||
HDFS-9002. Move o.a.h.hdfs.net/*Peer classes to hdfs-client.
|
||||
(Mingliang Liu via wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
@ -99,7 +99,6 @@
|
||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
||||
import org.apache.hadoop.hdfs.protocol.AclException;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
@ -3018,7 +3017,7 @@ public Peer newConnectedPeer(InetSocketAddress addr,
|
||||
try {
|
||||
sock = socketFactory.createSocket();
|
||||
NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), socketTimeout);
|
||||
peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
|
||||
peer = DFSUtilClient.peerFromSocketAndKey(saslClient, sock, this,
|
||||
blockToken, datanodeId);
|
||||
peer.setReadTimeout(socketTimeout);
|
||||
peer.setWriteTimeout(socketTimeout);
|
||||
|
@ -599,14 +599,28 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
// Security-related configs
|
||||
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
|
||||
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
|
||||
public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY = "dfs.encrypt.data.transfer.cipher.key.bitlength";
|
||||
public static final int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128;
|
||||
public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = "dfs.encrypt.data.transfer.cipher.suites";
|
||||
@Deprecated
|
||||
public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY =
|
||||
HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
|
||||
@Deprecated
|
||||
public static final int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT =
|
||||
HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
|
||||
@Deprecated
|
||||
public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
|
||||
HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
|
||||
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
|
||||
public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
|
||||
public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
|
||||
public static final String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = "";
|
||||
public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
|
||||
@Deprecated
|
||||
public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS =
|
||||
HdfsClientConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS;
|
||||
@Deprecated
|
||||
public static final String DFS_DATA_TRANSFER_PROTECTION_KEY =
|
||||
HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
@Deprecated
|
||||
public static final String DFS_DATA_TRANSFER_PROTECTION_DEFAULT =
|
||||
HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT;
|
||||
@Deprecated
|
||||
public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY =
|
||||
HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
|
||||
public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
|
||||
public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
|
||||
public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI =
|
||||
|
@ -20,22 +20,15 @@
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class TcpPeerServer implements PeerServer {
|
||||
@ -43,60 +36,6 @@ public class TcpPeerServer implements PeerServer {
|
||||
|
||||
private final ServerSocket serverSocket;
|
||||
|
||||
public static Peer peerFromSocket(Socket socket)
|
||||
throws IOException {
|
||||
Peer peer = null;
|
||||
boolean success = false;
|
||||
try {
|
||||
// TCP_NODELAY is crucial here because of bad interactions between
|
||||
// Nagle's Algorithm and Delayed ACKs. With connection keepalive
|
||||
// between the client and DN, the conversation looks like:
|
||||
// 1. Client -> DN: Read block X
|
||||
// 2. DN -> Client: data for block X
|
||||
// 3. Client -> DN: Status OK (successful read)
|
||||
// 4. Client -> DN: Read block Y
|
||||
// The fact that step #3 and #4 are both in the client->DN direction
|
||||
// triggers Nagling. If the DN is using delayed ACKs, this results
|
||||
// in a delay of 40ms or more.
|
||||
//
|
||||
// TCP_NODELAY disables nagling and thus avoids this performance
|
||||
// disaster.
|
||||
socket.setTcpNoDelay(true);
|
||||
SocketChannel channel = socket.getChannel();
|
||||
if (channel == null) {
|
||||
peer = new BasicInetPeer(socket);
|
||||
} else {
|
||||
peer = new NioInetPeer(socket);
|
||||
}
|
||||
success = true;
|
||||
return peer;
|
||||
} finally {
|
||||
if (!success) {
|
||||
if (peer != null) peer.close();
|
||||
socket.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static Peer peerFromSocketAndKey(
|
||||
SaslDataTransferClient saslClient, Socket s,
|
||||
DataEncryptionKeyFactory keyFactory,
|
||||
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
|
||||
throws IOException {
|
||||
Peer peer = null;
|
||||
boolean success = false;
|
||||
try {
|
||||
peer = peerFromSocket(s);
|
||||
peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
|
||||
success = true;
|
||||
return peer;
|
||||
} finally {
|
||||
if (!success) {
|
||||
IOUtils.cleanup(null, peer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a non-secure TcpPeerServer.
|
||||
*
|
||||
@ -136,7 +75,7 @@ public void setReceiveBufferSize(int size) throws IOException {
|
||||
|
||||
@Override
|
||||
public Peer accept() throws IOException, SocketTimeoutException {
|
||||
Peer peer = peerFromSocket(serverSocket.accept());
|
||||
Peer peer = DFSUtilClient.peerFromSocket(serverSocket.accept());
|
||||
return peer;
|
||||
}
|
||||
|
||||
|
@ -17,7 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
|
@ -20,7 +20,6 @@
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
|
||||
.EncryptionZoneProto;
|
||||
import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherSuiteProto;
|
||||
import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CryptoProtocolVersionProto;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -2315,7 +2314,7 @@ public static EncryptionZoneProto convert(EncryptionZone zone) {
|
||||
return EncryptionZoneProto.newBuilder()
|
||||
.setId(zone.getId())
|
||||
.setPath(zone.getPath())
|
||||
.setSuite(convert(zone.getSuite()))
|
||||
.setSuite(PBHelperClient.convert(zone.getSuite()))
|
||||
.setCryptoProtocolVersion(convert(zone.getVersion()))
|
||||
.setKeyName(zone.getKeyName())
|
||||
.build();
|
||||
@ -2323,7 +2322,7 @@ public static EncryptionZoneProto convert(EncryptionZone zone) {
|
||||
|
||||
public static EncryptionZone convert(EncryptionZoneProto proto) {
|
||||
return new EncryptionZone(proto.getId(), proto.getPath(),
|
||||
convert(proto.getSuite()), convert(proto.getCryptoProtocolVersion()),
|
||||
PBHelperClient.convert(proto.getSuite()), convert(proto.getCryptoProtocolVersion()),
|
||||
proto.getKeyName());
|
||||
}
|
||||
|
||||
@ -2625,106 +2624,6 @@ public static GetEditsFromTxidResponseProto convertEditsResponse(EventBatchList
|
||||
builder.build()).build();
|
||||
}
|
||||
|
||||
public static CipherOptionProto convert(CipherOption option) {
|
||||
if (option != null) {
|
||||
CipherOptionProto.Builder builder = CipherOptionProto.
|
||||
newBuilder();
|
||||
if (option.getCipherSuite() != null) {
|
||||
builder.setSuite(convert(option.getCipherSuite()));
|
||||
}
|
||||
if (option.getInKey() != null) {
|
||||
builder.setInKey(ByteString.copyFrom(option.getInKey()));
|
||||
}
|
||||
if (option.getInIv() != null) {
|
||||
builder.setInIv(ByteString.copyFrom(option.getInIv()));
|
||||
}
|
||||
if (option.getOutKey() != null) {
|
||||
builder.setOutKey(ByteString.copyFrom(option.getOutKey()));
|
||||
}
|
||||
if (option.getOutIv() != null) {
|
||||
builder.setOutIv(ByteString.copyFrom(option.getOutIv()));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static CipherOption convert(CipherOptionProto proto) {
|
||||
if (proto != null) {
|
||||
CipherSuite suite = null;
|
||||
if (proto.getSuite() != null) {
|
||||
suite = convert(proto.getSuite());
|
||||
}
|
||||
byte[] inKey = null;
|
||||
if (proto.getInKey() != null) {
|
||||
inKey = proto.getInKey().toByteArray();
|
||||
}
|
||||
byte[] inIv = null;
|
||||
if (proto.getInIv() != null) {
|
||||
inIv = proto.getInIv().toByteArray();
|
||||
}
|
||||
byte[] outKey = null;
|
||||
if (proto.getOutKey() != null) {
|
||||
outKey = proto.getOutKey().toByteArray();
|
||||
}
|
||||
byte[] outIv = null;
|
||||
if (proto.getOutIv() != null) {
|
||||
outIv = proto.getOutIv().toByteArray();
|
||||
}
|
||||
return new CipherOption(suite, inKey, inIv, outKey, outIv);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static List<CipherOptionProto> convertCipherOptions(
|
||||
List<CipherOption> options) {
|
||||
if (options != null) {
|
||||
List<CipherOptionProto> protos =
|
||||
Lists.newArrayListWithCapacity(options.size());
|
||||
for (CipherOption option : options) {
|
||||
protos.add(convert(option));
|
||||
}
|
||||
return protos;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static List<CipherOption> convertCipherOptionProtos(
|
||||
List<CipherOptionProto> protos) {
|
||||
if (protos != null) {
|
||||
List<CipherOption> options =
|
||||
Lists.newArrayListWithCapacity(protos.size());
|
||||
for (CipherOptionProto proto : protos) {
|
||||
options.add(convert(proto));
|
||||
}
|
||||
return options;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static CipherSuiteProto convert(CipherSuite suite) {
|
||||
switch (suite) {
|
||||
case UNKNOWN:
|
||||
return CipherSuiteProto.UNKNOWN;
|
||||
case AES_CTR_NOPADDING:
|
||||
return CipherSuiteProto.AES_CTR_NOPADDING;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static CipherSuite convert(CipherSuiteProto proto) {
|
||||
switch (proto) {
|
||||
case AES_CTR_NOPADDING:
|
||||
return CipherSuite.AES_CTR_NOPADDING;
|
||||
default:
|
||||
// Set to UNKNOWN and stash the unknown enum value
|
||||
CipherSuite suite = CipherSuite.UNKNOWN;
|
||||
suite.setUnknownValue(proto.getNumber());
|
||||
return suite;
|
||||
}
|
||||
}
|
||||
|
||||
public static List<CryptoProtocolVersionProto> convert(
|
||||
CryptoProtocolVersion[] versions) {
|
||||
List<CryptoProtocolVersionProto> protos =
|
||||
@ -2776,7 +2675,7 @@ public static HdfsProtos.FileEncryptionInfoProto convert(
|
||||
return null;
|
||||
}
|
||||
return HdfsProtos.FileEncryptionInfoProto.newBuilder()
|
||||
.setSuite(convert(info.getCipherSuite()))
|
||||
.setSuite(PBHelperClient.convert(info.getCipherSuite()))
|
||||
.setCryptoProtocolVersion(convert(info.getCryptoProtocolVersion()))
|
||||
.setKey(getByteString(info.getEncryptedDataEncryptionKey()))
|
||||
.setIv(getByteString(info.getIV()))
|
||||
@ -2803,7 +2702,7 @@ public static HdfsProtos.ZoneEncryptionInfoProto convert(
|
||||
return null;
|
||||
}
|
||||
return HdfsProtos.ZoneEncryptionInfoProto.newBuilder()
|
||||
.setSuite(convert(suite))
|
||||
.setSuite(PBHelperClient.convert(suite))
|
||||
.setCryptoProtocolVersion(convert(version))
|
||||
.setKeyName(keyName)
|
||||
.build();
|
||||
@ -2814,7 +2713,7 @@ public static FileEncryptionInfo convert(
|
||||
if (proto == null) {
|
||||
return null;
|
||||
}
|
||||
CipherSuite suite = convert(proto.getSuite());
|
||||
CipherSuite suite = PBHelperClient.convert(proto.getSuite());
|
||||
CryptoProtocolVersion version = convert(proto.getCryptoProtocolVersion());
|
||||
byte[] key = proto.getKey().toByteArray();
|
||||
byte[] iv = proto.getIv().toByteArray();
|
||||
|
@ -30,6 +30,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -282,9 +283,8 @@ static INode unprotectedSetXAttrs(
|
||||
final HdfsProtos.ZoneEncryptionInfoProto ezProto =
|
||||
HdfsProtos.ZoneEncryptionInfoProto.parseFrom(xattr.getValue());
|
||||
fsd.ezManager.addEncryptionZone(inode.getId(),
|
||||
PBHelper.convert(ezProto.getSuite()),
|
||||
PBHelper.convert(
|
||||
ezProto.getCryptoProtocolVersion()),
|
||||
PBHelperClient.convert(ezProto.getSuite()),
|
||||
PBHelper.convert(ezProto.getCryptoProtocolVersion()),
|
||||
ezProto.getKeyName());
|
||||
}
|
||||
|
||||
|
@ -49,6 +49,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
@ -1004,7 +1005,7 @@ public final void addToInodeMap(INode inode) {
|
||||
HdfsProtos.ZoneEncryptionInfoProto.parseFrom(
|
||||
xattr.getValue());
|
||||
ezManager.unprotectedAddEncryptionZone(inode.getId(),
|
||||
PBHelper.convert(ezProto.getSuite()),
|
||||
PBHelperClient.convert(ezProto.getSuite()),
|
||||
PBHelper.convert(ezProto.getCryptoProtocolVersion()),
|
||||
ezProto.getKeyName());
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
|
@ -45,11 +45,11 @@
|
||||
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.RemotePeerFactory;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
@ -884,7 +884,7 @@ public Peer newConnectedPeer(InetSocketAddress addr,
|
||||
try {
|
||||
s.connect(addr, HdfsConstants.READ_TIMEOUT);
|
||||
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
|
||||
peer = TcpPeerServer.peerFromSocketAndKey(
|
||||
peer = DFSUtilClient.peerFromSocketAndKey(
|
||||
dfs.getSaslDataTransferClient(), s, NamenodeFsck.this,
|
||||
blockToken, datanodeId);
|
||||
} finally {
|
||||
|
@ -32,7 +32,6 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
@ -204,7 +203,7 @@ public Peer newConnectedPeer(InetSocketAddress addr,
|
||||
try {
|
||||
sock.connect(addr, HdfsConstants.READ_TIMEOUT);
|
||||
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
|
||||
peer = TcpPeerServer.peerFromSocket(sock);
|
||||
peer = DFSUtilClient.peerFromSocket(sock);
|
||||
} finally {
|
||||
if (peer == null) {
|
||||
IOUtils.closeQuietly(sock);
|
||||
|
@ -21,13 +21,13 @@
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
|
||||
|
@ -77,7 +77,7 @@ private void setEncryptionConfigKeys(Configuration conf) {
|
||||
conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||
if (resolverClazz != null){
|
||||
conf.set(DFSConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS, resolverClazz);
|
||||
conf.set(HdfsClientConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS, resolverClazz);
|
||||
}
|
||||
}
|
||||
|
||||
@ -209,7 +209,7 @@ public void testEncryptedReadWithAES() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY,
|
||||
conf.set(HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY,
|
||||
"AES/CTR/NoPadding");
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
|
@ -18,8 +18,8 @@
|
||||
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
||||
|
@ -17,7 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
|
@ -20,8 +20,8 @@
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
||||
|
@ -41,12 +41,12 @@
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.RemotePeerFactory;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
@ -171,7 +171,7 @@ public Peer newConnectedPeer(InetSocketAddress addr,
|
||||
try {
|
||||
sock.connect(addr, HdfsConstants.READ_TIMEOUT);
|
||||
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
|
||||
peer = TcpPeerServer.peerFromSocket(sock);
|
||||
peer = DFSUtilClient.peerFromSocket(sock);
|
||||
} finally {
|
||||
if (peer == null) {
|
||||
IOUtils.closeSocket(sock);
|
||||
|
@ -43,12 +43,12 @@
|
||||
import org.apache.hadoop.hdfs.ClientContext;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.RemotePeerFactory;
|
||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
@ -525,7 +525,7 @@ public Peer newConnectedPeer(InetSocketAddress addr,
|
||||
try {
|
||||
sock.connect(addr, HdfsConstants.READ_TIMEOUT);
|
||||
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
|
||||
peer = TcpPeerServer.peerFromSocket(sock);
|
||||
peer = DFSUtilClient.peerFromSocket(sock);
|
||||
} finally {
|
||||
if (peer == null) {
|
||||
IOUtils.closeSocket(sock);
|
||||
|
Loading…
Reference in New Issue
Block a user