HDFS-10223. peerFromSocketAndKey performs SASL exchange before setting connection timeouts (cmccabe)
This commit is contained in:
parent
60e4116bf1
commit
37e23ce45c
@ -2734,9 +2734,7 @@ public Peer newConnectedPeer(InetSocketAddress addr,
|
|||||||
NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(),
|
NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(),
|
||||||
socketTimeout);
|
socketTimeout);
|
||||||
peer = DFSUtilClient.peerFromSocketAndKey(saslClient, sock, this,
|
peer = DFSUtilClient.peerFromSocketAndKey(saslClient, sock, this,
|
||||||
blockToken, datanodeId);
|
blockToken, datanodeId, socketTimeout);
|
||||||
peer.setReadTimeout(socketTimeout);
|
|
||||||
peer.setWriteTimeout(socketTimeout);
|
|
||||||
success = true;
|
success = true;
|
||||||
return peer;
|
return peer;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -587,12 +587,14 @@ public static Peer peerFromSocket(Socket socket)
|
|||||||
public static Peer peerFromSocketAndKey(
|
public static Peer peerFromSocketAndKey(
|
||||||
SaslDataTransferClient saslClient, Socket s,
|
SaslDataTransferClient saslClient, Socket s,
|
||||||
DataEncryptionKeyFactory keyFactory,
|
DataEncryptionKeyFactory keyFactory,
|
||||||
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
|
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId,
|
||||||
throws IOException {
|
int socketTimeoutMs) throws IOException {
|
||||||
Peer peer = null;
|
Peer peer = null;
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
peer = peerFromSocket(s);
|
peer = peerFromSocket(s);
|
||||||
|
peer.setReadTimeout(socketTimeoutMs);
|
||||||
|
peer.setWriteTimeout(socketTimeoutMs);
|
||||||
peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
|
peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
|
||||||
success = true;
|
success = true;
|
||||||
return peer;
|
return peer;
|
||||||
|
@ -875,8 +875,7 @@ private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr,
|
|||||||
NetUtils.connect(sock, addr, socketTimeout);
|
NetUtils.connect(sock, addr, socketTimeout);
|
||||||
peer = DFSUtilClient.peerFromSocketAndKey(datanode.getSaslClient(),
|
peer = DFSUtilClient.peerFromSocketAndKey(datanode.getSaslClient(),
|
||||||
sock, datanode.getDataEncryptionKeyFactoryForBlock(b),
|
sock, datanode.getDataEncryptionKeyFactoryForBlock(b),
|
||||||
blockToken, datanodeId);
|
blockToken, datanodeId, socketTimeout);
|
||||||
peer.setReadTimeout(socketTimeout);
|
|
||||||
success = true;
|
success = true;
|
||||||
return peer;
|
return peer;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -952,7 +952,7 @@ public Peer newConnectedPeer(InetSocketAddress addr,
|
|||||||
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
|
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
|
||||||
peer = DFSUtilClient.peerFromSocketAndKey(
|
peer = DFSUtilClient.peerFromSocketAndKey(
|
||||||
dfs.getSaslDataTransferClient(), s, NamenodeFsck.this,
|
dfs.getSaslDataTransferClient(), s, NamenodeFsck.this,
|
||||||
blockToken, datanodeId);
|
blockToken, datanodeId, HdfsConstants.READ_TIMEOUT);
|
||||||
} finally {
|
} finally {
|
||||||
if (peer == null) {
|
if (peer == null) {
|
||||||
IOUtils.closeQuietly(s);
|
IOUtils.closeQuietly(s);
|
||||||
|
@ -25,6 +25,10 @@
|
|||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.ServerSocket;
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
@ -32,12 +36,18 @@
|
|||||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
@ -197,4 +207,42 @@ private void startCluster(HdfsConfiguration conf) throws IOException {
|
|||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies that peerFromSocketAndKey honors socket read timeouts.
|
||||||
|
*/
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void TestPeerFromSocketAndKeyReadTimeout() throws Exception {
|
||||||
|
HdfsConfiguration conf = createSecureConfig(
|
||||||
|
"authentication,integrity,privacy");
|
||||||
|
AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
|
||||||
|
SaslDataTransferClient saslClient = new SaslDataTransferClient(
|
||||||
|
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
||||||
|
TrustedChannelResolver.getInstance(conf), fallbackToSimpleAuth);
|
||||||
|
DatanodeID fakeDatanodeId = new DatanodeID("127.0.0.1", "localhost",
|
||||||
|
"beefbeef-beef-beef-beef-beefbeefbeef", 1, 2, 3, 4);
|
||||||
|
DataEncryptionKeyFactory dataEncKeyFactory =
|
||||||
|
new DataEncryptionKeyFactory() {
|
||||||
|
@Override
|
||||||
|
public DataEncryptionKey newDataEncryptionKey() {
|
||||||
|
return new DataEncryptionKey(123, "456", new byte[8],
|
||||||
|
new byte[8], 1234567, "fakeAlgorithm");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
ServerSocket serverSocket = null;
|
||||||
|
Socket socket = null;
|
||||||
|
try {
|
||||||
|
serverSocket = new ServerSocket(0, -1);
|
||||||
|
socket = new Socket(serverSocket.getInetAddress(),
|
||||||
|
serverSocket.getLocalPort());
|
||||||
|
Peer peer = DFSUtilClient.peerFromSocketAndKey(saslClient, socket,
|
||||||
|
dataEncKeyFactory, new Token(), fakeDatanodeId, 1);
|
||||||
|
peer.close();
|
||||||
|
Assert.fail("Expected DFSClient#peerFromSocketAndKey to time out.");
|
||||||
|
} catch (SocketTimeoutException e) {
|
||||||
|
GenericTestUtils.assertExceptionContains("Read timed out", e);
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(null, socket, serverSocket);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user