HDFS-10609. Uncaught InvalidEncryptionKeyException during pipeline recovery may abort downstream applications. Contributed by Wei-Chiu Chuang.
This commit is contained in:
parent
fa397e74fe
commit
3ae652f821
@ -1710,6 +1710,11 @@ public DataEncryptionKey newDataEncryptionKey() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public DataEncryptionKey getEncryptionKey() {
|
||||
return encryptionKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the checksum of the whole file or a range of the file. Note that the
|
||||
* range always starts from the beginning of the file. The file can be
|
||||
|
@ -116,6 +116,89 @@
|
||||
class DataStreamer extends Daemon {
|
||||
static final Logger LOG = LoggerFactory.getLogger(DataStreamer.class);
|
||||
|
||||
private class RefetchEncryptionKeyPolicy {
|
||||
private int fetchEncryptionKeyTimes = 0;
|
||||
private InvalidEncryptionKeyException lastException;
|
||||
private final DatanodeInfo src;
|
||||
|
||||
RefetchEncryptionKeyPolicy(DatanodeInfo src) {
|
||||
this.src = src;
|
||||
}
|
||||
boolean continueRetryingOrThrow() throws InvalidEncryptionKeyException {
|
||||
if (fetchEncryptionKeyTimes >= 2) {
|
||||
// hit the same exception twice connecting to the node, so
|
||||
// throw the exception and exclude the node.
|
||||
throw lastException;
|
||||
}
|
||||
// Don't exclude this node just yet.
|
||||
// Try again with a new encryption key.
|
||||
LOG.info("Will fetch a new encryption key and retry, "
|
||||
+ "encryption key was invalid when connecting to "
|
||||
+ this.src + ": ", lastException);
|
||||
// The encryption key used is invalid.
|
||||
dfsClient.clearDataEncryptionKey();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a connection exception.
|
||||
* @param e
|
||||
* @throws InvalidEncryptionKeyException
|
||||
*/
|
||||
void recordFailure(final InvalidEncryptionKeyException e)
|
||||
throws InvalidEncryptionKeyException {
|
||||
fetchEncryptionKeyTimes++;
|
||||
lastException = e;
|
||||
}
|
||||
}
|
||||
|
||||
private class StreamerStreams implements java.io.Closeable {
|
||||
private Socket sock = null;
|
||||
private DataOutputStream out = null;
|
||||
private DataInputStream in = null;
|
||||
|
||||
StreamerStreams(final DatanodeInfo src,
|
||||
final long writeTimeout, final long readTimeout,
|
||||
final Token<BlockTokenIdentifier> blockToken)
|
||||
throws IOException {
|
||||
sock = createSocketForPipeline(src, 2, dfsClient);
|
||||
|
||||
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
|
||||
InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
|
||||
IOStreamPair saslStreams = dfsClient.saslClient
|
||||
.socketSend(sock, unbufOut, unbufIn, dfsClient, blockToken, src);
|
||||
unbufOut = saslStreams.out;
|
||||
unbufIn = saslStreams.in;
|
||||
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
|
||||
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
|
||||
in = new DataInputStream(unbufIn);
|
||||
}
|
||||
|
||||
void sendTransferBlock(final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes,
|
||||
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
||||
//send the TRANSFER_BLOCK request
|
||||
new Sender(out)
|
||||
.transferBlock(block, blockToken, dfsClient.clientName, targets,
|
||||
targetStorageTypes);
|
||||
out.flush();
|
||||
//ack
|
||||
BlockOpResponseProto transferResponse = BlockOpResponseProto
|
||||
.parseFrom(PBHelperClient.vintPrefixed(in));
|
||||
if (SUCCESS != transferResponse.getStatus()) {
|
||||
throw new IOException("Failed to add a datanode. Response status: "
|
||||
+ transferResponse.getStatus());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
IOUtils.closeStream(in);
|
||||
IOUtils.closeStream(out);
|
||||
IOUtils.closeSocket(sock);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a socket for a write pipeline
|
||||
*
|
||||
@ -1270,50 +1353,39 @@ private void addDatanode2ExistingPipeline() throws IOException {
|
||||
new IOException("Failed to add a node");
|
||||
}
|
||||
|
||||
private long computeTransferWriteTimeout() {
|
||||
return dfsClient.getDatanodeWriteTimeout(2);
|
||||
}
|
||||
private long computeTransferReadTimeout() {
|
||||
// transfer timeout multiplier based on the transfer size
|
||||
// One per 200 packets = 12.8MB. Minimum is 2.
|
||||
int multi = 2
|
||||
+ (int) (bytesSent / dfsClient.getConf().getWritePacketSize()) / 200;
|
||||
return dfsClient.getDatanodeReadTimeout(multi);
|
||||
}
|
||||
|
||||
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
|
||||
final StorageType[] targetStorageTypes,
|
||||
final Token<BlockTokenIdentifier> blockToken)
|
||||
throws IOException {
|
||||
//transfer replica to the new datanode
|
||||
Socket sock = null;
|
||||
DataOutputStream out = null;
|
||||
DataInputStream in = null;
|
||||
try {
|
||||
sock = createSocketForPipeline(src, 2, dfsClient);
|
||||
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
|
||||
RefetchEncryptionKeyPolicy policy = new RefetchEncryptionKeyPolicy(src);
|
||||
do {
|
||||
StreamerStreams streams = null;
|
||||
try {
|
||||
final long writeTimeout = computeTransferWriteTimeout();
|
||||
final long readTimeout = computeTransferReadTimeout();
|
||||
|
||||
// transfer timeout multiplier based on the transfer size
|
||||
// One per 200 packets = 12.8MB. Minimum is 2.
|
||||
int multi = 2 + (int)(bytesSent /dfsClient.getConf().getWritePacketSize())
|
||||
/ 200;
|
||||
final long readTimeout = dfsClient.getDatanodeReadTimeout(multi);
|
||||
|
||||
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
|
||||
InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
|
||||
IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
|
||||
unbufOut, unbufIn, dfsClient, blockToken, src);
|
||||
unbufOut = saslStreams.out;
|
||||
unbufIn = saslStreams.in;
|
||||
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
|
||||
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
|
||||
in = new DataInputStream(unbufIn);
|
||||
|
||||
//send the TRANSFER_BLOCK request
|
||||
new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
|
||||
targets, targetStorageTypes);
|
||||
out.flush();
|
||||
|
||||
//ack
|
||||
BlockOpResponseProto response =
|
||||
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
|
||||
if (SUCCESS != response.getStatus()) {
|
||||
throw new IOException("Failed to add a datanode");
|
||||
streams = new StreamerStreams(src, writeTimeout, readTimeout,
|
||||
blockToken);
|
||||
streams.sendTransferBlock(targets, targetStorageTypes, blockToken);
|
||||
return;
|
||||
} catch (InvalidEncryptionKeyException e) {
|
||||
policy.recordFailure(e);
|
||||
} finally {
|
||||
IOUtils.closeStream(streams);
|
||||
}
|
||||
} finally {
|
||||
IOUtils.closeStream(in);
|
||||
IOUtils.closeStream(out);
|
||||
IOUtils.closeSocket(sock);
|
||||
}
|
||||
} while (policy.continueRetryingOrThrow());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -49,7 +49,8 @@ public synchronized void addBlockPool(String bpid,
|
||||
map.put(bpid, secretMgr);
|
||||
}
|
||||
|
||||
synchronized BlockTokenSecretManager get(String bpid) {
|
||||
@VisibleForTesting
|
||||
public synchronized BlockTokenSecretManager get(String bpid) {
|
||||
BlockTokenSecretManager secretMgr = map.get(bpid);
|
||||
if (secretMgr == null) {
|
||||
throw new IllegalArgumentException("Block pool " + bpid
|
||||
|
@ -435,6 +435,12 @@ public void clearAllKeysForTesting() {
|
||||
allKeys.clear();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public synchronized boolean hasKey(int keyId) {
|
||||
BlockKey key = allKeys.get(keyId);
|
||||
return key != null;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public synchronized int getSerialNoForTesting() {
|
||||
return serialNo;
|
||||
|
@ -2779,6 +2779,11 @@ DirectoryScanner getDirectoryScanner() {
|
||||
return directoryScanner;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public BlockPoolTokenSecretManager getBlockPoolTokenSecretManager() {
|
||||
return blockPoolTokenSecretManager;
|
||||
}
|
||||
|
||||
public static void secureMain(String args[], SecureResources resources) {
|
||||
int errorCode = 0;
|
||||
try {
|
||||
|
@ -18,24 +18,31 @@
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
|
||||
@ -45,9 +52,14 @@
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
@ -59,6 +71,9 @@ public class TestEncryptedTransfer {
|
||||
LogManager.getLogger(SaslDataTransferServer.class).setLevel(Level.DEBUG);
|
||||
LogManager.getLogger(DataTransferSaslUtil.class).setLevel(Level.DEBUG);
|
||||
}
|
||||
|
||||
@Rule
|
||||
public Timeout timeout = new Timeout(300000);
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
@ -72,8 +87,12 @@ public static Collection<Object[]> data() {
|
||||
|
||||
private static final String PLAIN_TEXT = "this is very secret plain text";
|
||||
private static final Path TEST_PATH = new Path("/non-encrypted-file");
|
||||
|
||||
private MiniDFSCluster cluster = null;
|
||||
private Configuration conf = null;
|
||||
private FileSystem fs = null;
|
||||
|
||||
private void setEncryptionConfigKeys(Configuration conf) {
|
||||
private void setEncryptionConfigKeys() {
|
||||
conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||
if (resolverClazz != null){
|
||||
@ -96,505 +115,359 @@ public TestEncryptedTransfer(String resolverClazz){
|
||||
this.resolverClazz = resolverClazz;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptedRead() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
FileSystem fs = getFileSystem(conf);
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
conf = new Configuration();
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() throws IOException {
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
}
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
|
||||
setEncryptionConfigKeys(conf);
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.manageDataDfsDirs(false)
|
||||
.manageNameDfsDirs(false)
|
||||
.format(false)
|
||||
.startupOption(StartupOption.REGULAR)
|
||||
.build();
|
||||
|
||||
fs = getFileSystem(conf);
|
||||
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(SaslDataTransferServer.class));
|
||||
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(DataTransferSaslUtil.class));
|
||||
try {
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
} finally {
|
||||
logs.stopCapturing();
|
||||
logs1.stopCapturing();
|
||||
}
|
||||
|
||||
fs.close();
|
||||
|
||||
if (resolverClazz == null) {
|
||||
// Test client and server negotiate cipher option
|
||||
GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
|
||||
"Server using cipher suite");
|
||||
// Check the IOStreamPair
|
||||
GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
|
||||
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptedReadWithRC4() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
FileSystem fs = getFileSystem(conf);
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
|
||||
fs.close();
|
||||
cluster.shutdown();
|
||||
|
||||
setEncryptionConfigKeys(conf);
|
||||
// It'll use 3DES by default, but we set it to rc4 here.
|
||||
conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, "rc4");
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.manageDataDfsDirs(false)
|
||||
.manageNameDfsDirs(false)
|
||||
.format(false)
|
||||
.startupOption(StartupOption.REGULAR)
|
||||
.build();
|
||||
|
||||
fs = getFileSystem(conf);
|
||||
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(SaslDataTransferServer.class));
|
||||
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(DataTransferSaslUtil.class));
|
||||
try {
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
} finally {
|
||||
logs.stopCapturing();
|
||||
logs1.stopCapturing();
|
||||
}
|
||||
|
||||
fs.close();
|
||||
private FileChecksum writeUnencryptedAndThenRestartEncryptedCluster()
|
||||
throws IOException {
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
if (resolverClazz == null) {
|
||||
// Test client and server negotiate cipher option
|
||||
GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
|
||||
"Server using cipher suite");
|
||||
// Check the IOStreamPair
|
||||
GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
|
||||
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
fs = getFileSystem(conf);
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
|
||||
fs.close();
|
||||
cluster.shutdown();
|
||||
|
||||
setEncryptionConfigKeys();
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.manageDataDfsDirs(false)
|
||||
.manageNameDfsDirs(false)
|
||||
.format(false)
|
||||
.startupOption(StartupOption.REGULAR)
|
||||
.build();
|
||||
|
||||
fs = getFileSystem(conf);
|
||||
return checksum;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptedReadWithAES() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
|
||||
private void testEncryptedRead(String algorithm, String cipherSuite,
|
||||
boolean matchLog, boolean readAfterRestart)
|
||||
throws IOException {
|
||||
// set encryption algorithm and cipher suites, but don't enable transfer
|
||||
// encryption yet.
|
||||
conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, algorithm);
|
||||
conf.set(HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY,
|
||||
cipherSuite);
|
||||
|
||||
FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
|
||||
|
||||
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(SaslDataTransferServer.class));
|
||||
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(DataTransferSaslUtil.class));
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY,
|
||||
"AES/CTR/NoPadding");
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
FileSystem fs = getFileSystem(conf);
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
|
||||
fs.close();
|
||||
cluster.shutdown();
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
} finally {
|
||||
logs.stopCapturing();
|
||||
logs1.stopCapturing();
|
||||
}
|
||||
|
||||
setEncryptionConfigKeys(conf);
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.manageDataDfsDirs(false)
|
||||
.manageNameDfsDirs(false)
|
||||
.format(false)
|
||||
.startupOption(StartupOption.REGULAR)
|
||||
.build();
|
||||
|
||||
fs = getFileSystem(conf);
|
||||
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(SaslDataTransferServer.class));
|
||||
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(DataTransferSaslUtil.class));
|
||||
try {
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
} finally {
|
||||
logs.stopCapturing();
|
||||
logs1.stopCapturing();
|
||||
}
|
||||
|
||||
fs.close();
|
||||
|
||||
if (resolverClazz == null) {
|
||||
if (resolverClazz == null) {
|
||||
if (matchLog) {
|
||||
// Test client and server negotiate cipher option
|
||||
GenericTestUtils.assertMatches(logs.getOutput(),
|
||||
"Server using cipher suite");
|
||||
GenericTestUtils
|
||||
.assertMatches(logs.getOutput(), "Server using cipher suite");
|
||||
// Check the IOStreamPair
|
||||
GenericTestUtils.assertMatches(logs1.getOutput(),
|
||||
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
} else {
|
||||
// Test client and server negotiate cipher option
|
||||
GenericTestUtils
|
||||
.assertDoesNotMatch(logs.getOutput(), "Server using cipher suite");
|
||||
// Check the IOStreamPair
|
||||
GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
|
||||
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
|
||||
}
|
||||
}
|
||||
|
||||
if (readAfterRestart) {
|
||||
cluster.restartNameNode();
|
||||
fs = getFileSystem(conf);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptedReadDefaultAlgorithmCipherSuite()
|
||||
throws IOException {
|
||||
testEncryptedRead("", "", false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptedReadWithRC4() throws IOException {
|
||||
testEncryptedRead("rc4", "", false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptedReadWithAES() throws IOException {
|
||||
testEncryptedRead("", "AES/CTR/NoPadding", true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptedReadAfterNameNodeRestart() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
FileSystem fs = getFileSystem(conf);
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
|
||||
fs.close();
|
||||
cluster.shutdown();
|
||||
|
||||
setEncryptionConfigKeys(conf);
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.manageDataDfsDirs(false)
|
||||
.manageNameDfsDirs(false)
|
||||
.format(false)
|
||||
.startupOption(StartupOption.REGULAR)
|
||||
.build();
|
||||
|
||||
fs = getFileSystem(conf);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
fs.close();
|
||||
|
||||
cluster.restartNameNode();
|
||||
fs = getFileSystem(conf);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
fs.close();
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
testEncryptedRead("", "", false, true);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testClientThatDoesNotSupportEncryption() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
// Set short retry timeouts so this test runs faster
|
||||
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
|
||||
|
||||
writeUnencryptedAndThenRestartEncryptedCluster();
|
||||
|
||||
DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
|
||||
DFSClient spyClient = Mockito.spy(client);
|
||||
Mockito.doReturn(false).when(spyClient).shouldEncryptData();
|
||||
DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
|
||||
|
||||
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(DataNode.class));
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
// Set short retry timeouts so this test runs faster
|
||||
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
FileSystem fs = getFileSystem(conf);
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
fs.close();
|
||||
cluster.shutdown();
|
||||
|
||||
setEncryptionConfigKeys(conf);
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.manageDataDfsDirs(false)
|
||||
.manageNameDfsDirs(false)
|
||||
.format(false)
|
||||
.startupOption(StartupOption.REGULAR)
|
||||
.build();
|
||||
|
||||
|
||||
fs = getFileSystem(conf);
|
||||
DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs);
|
||||
DFSClient spyClient = Mockito.spy(client);
|
||||
Mockito.doReturn(false).when(spyClient).shouldEncryptData();
|
||||
DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
|
||||
|
||||
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(DataNode.class));
|
||||
try {
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
|
||||
fail("Should not have been able to read without encryption enabled.");
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
GenericTestUtils.assertExceptionContains("Could not obtain block:",
|
||||
ioe);
|
||||
} finally {
|
||||
logs.stopCapturing();
|
||||
}
|
||||
fs.close();
|
||||
|
||||
if (resolverClazz == null) {
|
||||
GenericTestUtils.assertMatches(logs.getOutput(),
|
||||
"Failed to read expected encryption handshake from client at");
|
||||
if (resolverClazz != null &&
|
||||
!resolverClazz.endsWith("TestTrustedChannelResolver")){
|
||||
fail("Should not have been able to read without encryption enabled.");
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
GenericTestUtils.assertExceptionContains("Could not obtain block:",
|
||||
ioe);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
logs.stopCapturing();
|
||||
}
|
||||
|
||||
if (resolverClazz == null) {
|
||||
GenericTestUtils.assertMatches(logs.getOutput(),
|
||||
"Failed to read expected encryption handshake from client at");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testLongLivedReadClientAfterRestart() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
FileSystem fs = getFileSystem(conf);
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
|
||||
fs.close();
|
||||
cluster.shutdown();
|
||||
|
||||
setEncryptionConfigKeys(conf);
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.manageDataDfsDirs(false)
|
||||
.manageNameDfsDirs(false)
|
||||
.format(false)
|
||||
.startupOption(StartupOption.REGULAR)
|
||||
.build();
|
||||
|
||||
fs = getFileSystem(conf);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
|
||||
// Restart the NN and DN, after which the client's encryption key will no
|
||||
// longer be valid.
|
||||
cluster.restartNameNode();
|
||||
assertTrue(cluster.restartDataNode(0));
|
||||
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
|
||||
fs.close();
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
|
||||
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
|
||||
// Restart the NN and DN, after which the client's encryption key will no
|
||||
// longer be valid.
|
||||
cluster.restartNameNode();
|
||||
assertTrue(cluster.restartDataNode(0));
|
||||
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testLongLivedWriteClientAfterRestart() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
setEncryptionConfigKeys(conf);
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
FileSystem fs = getFileSystem(conf);
|
||||
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
|
||||
// Restart the NN and DN, after which the client's encryption key will no
|
||||
// longer be valid.
|
||||
cluster.restartNameNode();
|
||||
assertTrue(cluster.restartDataNodes());
|
||||
cluster.waitActive();
|
||||
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
|
||||
fs.close();
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
setEncryptionConfigKeys();
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
fs = getFileSystem(conf);
|
||||
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
|
||||
// Restart the NN and DN, after which the client's encryption key will no
|
||||
// longer be valid.
|
||||
cluster.restartNameNode();
|
||||
assertTrue(cluster.restartDataNodes());
|
||||
cluster.waitActive();
|
||||
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLongLivedClient() throws IOException, InterruptedException {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
|
||||
FileSystem fs = getFileSystem(conf);
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
|
||||
fs.close();
|
||||
cluster.shutdown();
|
||||
|
||||
setEncryptionConfigKeys(conf);
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.manageDataDfsDirs(false)
|
||||
.manageNameDfsDirs(false)
|
||||
.format(false)
|
||||
.startupOption(StartupOption.REGULAR)
|
||||
.build();
|
||||
|
||||
BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
|
||||
.getBlockTokenSecretManager();
|
||||
btsm.setKeyUpdateIntervalForTesting(2 * 1000);
|
||||
btsm.setTokenLifetime(2 * 1000);
|
||||
btsm.clearAllKeysForTesting();
|
||||
|
||||
fs = getFileSystem(conf);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
|
||||
// Sleep for 15 seconds, after which the encryption key will no longer be
|
||||
// valid. It needs to be a few multiples of the block token lifetime,
|
||||
// since several block tokens are valid at any given time (the current
|
||||
// and the last two, by default.)
|
||||
LOG.info("Sleeping so that encryption keys expire...");
|
||||
Thread.sleep(15 * 1000);
|
||||
LOG.info("Done sleeping.");
|
||||
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
|
||||
fs.close();
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
|
||||
|
||||
BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
|
||||
.getBlockTokenSecretManager();
|
||||
btsm.setKeyUpdateIntervalForTesting(2 * 1000);
|
||||
btsm.setTokenLifetime(2 * 1000);
|
||||
btsm.clearAllKeysForTesting();
|
||||
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
|
||||
// Sleep for 15 seconds, after which the encryption key will no longer be
|
||||
// valid. It needs to be a few multiples of the block token lifetime,
|
||||
// since several block tokens are valid at any given time (the current
|
||||
// and the last two, by default.)
|
||||
LOG.info("Sleeping so that encryption keys expire...");
|
||||
Thread.sleep(15 * 1000);
|
||||
LOG.info("Done sleeping.");
|
||||
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
|
||||
}
|
||||
|
||||
|
||||
public void testLongLivedClientPipelineRecovery()
|
||||
throws IOException, InterruptedException, TimeoutException {
|
||||
if (resolverClazz != null) {
|
||||
// TestTrustedChannelResolver does not use encryption keys.
|
||||
return;
|
||||
}
|
||||
// use 4 datanodes to make sure that after 1 data node is stopped,
|
||||
// client only retries establishing pipeline with the 4th node.
|
||||
int numDataNodes = 4;
|
||||
// do not consider load factor when selecting a data node
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
|
||||
false);
|
||||
setEncryptionConfigKeys();
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(numDataNodes)
|
||||
.build();
|
||||
|
||||
fs = getFileSystem(conf);
|
||||
DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
|
||||
DFSClient spyClient = Mockito.spy(client);
|
||||
DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
|
||||
writeTestDataToFile(fs);
|
||||
|
||||
BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
|
||||
.getBlockTokenSecretManager();
|
||||
// Reduce key update interval and token life for testing.
|
||||
btsm.setKeyUpdateIntervalForTesting(2 * 1000);
|
||||
btsm.setTokenLifetime(2 * 1000);
|
||||
btsm.clearAllKeysForTesting();
|
||||
|
||||
// Wait until the encryption key becomes invalid.
|
||||
LOG.info("Wait until encryption keys become invalid...");
|
||||
|
||||
DataEncryptionKey encryptionKey = spyClient.getEncryptionKey();
|
||||
List<DataNode> dataNodes = cluster.getDataNodes();
|
||||
for (DataNode dn: dataNodes) {
|
||||
GenericTestUtils.waitFor(
|
||||
new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return !dn.getBlockPoolTokenSecretManager().
|
||||
get(encryptionKey.blockPoolId)
|
||||
.hasKey(encryptionKey.keyId);
|
||||
}
|
||||
}, 100, 30*1000
|
||||
);
|
||||
}
|
||||
LOG.info("The encryption key is invalid on all nodes now.");
|
||||
try(FSDataOutputStream out = fs.append(TEST_PATH)) {
|
||||
DFSOutputStream dfstream = (DFSOutputStream) out.getWrappedStream();
|
||||
// shut down the first datanode in the pipeline.
|
||||
DatanodeInfo[] targets = dfstream.getPipeline();
|
||||
cluster.stopDataNode(targets[0].getXferAddr());
|
||||
// write data to induce pipeline recovery
|
||||
out.write(PLAIN_TEXT.getBytes());
|
||||
out.hflush();
|
||||
assertFalse("The first datanode in the pipeline was not replaced.",
|
||||
Arrays.asList(dfstream.getPipeline()).contains(targets[0]));
|
||||
}
|
||||
// verify that InvalidEncryptionKeyException is handled properly
|
||||
Mockito.verify(spyClient, times(1)).clearDataEncryptionKey();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptedWriteWithOneDn() throws IOException {
|
||||
testEncryptedWrite(1);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testEncryptedWriteWithTwoDns() throws IOException {
|
||||
testEncryptedWrite(2);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testEncryptedWriteWithMultipleDns() throws IOException {
|
||||
testEncryptedWrite(10);
|
||||
}
|
||||
|
||||
|
||||
private void testEncryptedWrite(int numDns) throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
setEncryptionConfigKeys();
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
|
||||
|
||||
fs = getFileSystem(conf);
|
||||
|
||||
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(SaslDataTransferServer.class));
|
||||
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(DataTransferSaslUtil.class));
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
setEncryptionConfigKeys(conf);
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
|
||||
|
||||
FileSystem fs = getFileSystem(conf);
|
||||
|
||||
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(SaslDataTransferServer.class));
|
||||
LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
|
||||
LogFactory.getLog(DataTransferSaslUtil.class));
|
||||
try {
|
||||
writeTestDataToFile(fs);
|
||||
} finally {
|
||||
logs.stopCapturing();
|
||||
logs1.stopCapturing();
|
||||
}
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
fs.close();
|
||||
|
||||
if (resolverClazz == null) {
|
||||
// Test client and server negotiate cipher option
|
||||
GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
|
||||
"Server using cipher suite");
|
||||
// Check the IOStreamPair
|
||||
GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
|
||||
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
|
||||
}
|
||||
writeTestDataToFile(fs);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
logs.stopCapturing();
|
||||
logs1.stopCapturing();
|
||||
}
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
|
||||
if (resolverClazz == null) {
|
||||
// Test client and server negotiate cipher option
|
||||
GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
|
||||
"Server using cipher suite");
|
||||
// Check the IOStreamPair
|
||||
GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
|
||||
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testEncryptedAppend() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
setEncryptionConfigKeys(conf);
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
|
||||
FileSystem fs = getFileSystem(conf);
|
||||
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
|
||||
fs.close();
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
setEncryptionConfigKeys();
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
|
||||
fs = getFileSystem(conf);
|
||||
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testEncryptedAppendRequiringBlockTransfer() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
setEncryptionConfigKeys(conf);
|
||||
|
||||
// start up 4 DNs
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
|
||||
|
||||
FileSystem fs = getFileSystem(conf);
|
||||
|
||||
// Create a file with replication 3, so its block is on 3 / 4 DNs.
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
|
||||
// Shut down one of the DNs holding a block replica.
|
||||
FSDataInputStream in = fs.open(TEST_PATH);
|
||||
List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in);
|
||||
in.close();
|
||||
assertEquals(1, locatedBlocks.size());
|
||||
assertEquals(3, locatedBlocks.get(0).getLocations().length);
|
||||
DataNode dn = cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort());
|
||||
dn.shutdown();
|
||||
|
||||
// Reopen the file for append, which will need to add another DN to the
|
||||
// pipeline and in doing so trigger a block transfer.
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
|
||||
fs.close();
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
setEncryptionConfigKeys();
|
||||
|
||||
// start up 4 DNs
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
|
||||
|
||||
fs = getFileSystem(conf);
|
||||
|
||||
// Create a file with replication 3, so its block is on 3 / 4 DNs.
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
|
||||
// Shut down one of the DNs holding a block replica.
|
||||
FSDataInputStream in = fs.open(TEST_PATH);
|
||||
List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in);
|
||||
in.close();
|
||||
assertEquals(1, locatedBlocks.size());
|
||||
assertEquals(3, locatedBlocks.get(0).getLocations().length);
|
||||
DataNode dn = cluster.getDataNode(
|
||||
locatedBlocks.get(0).getLocations()[0].getIpcPort());
|
||||
dn.shutdown();
|
||||
|
||||
// Reopen the file for append, which will need to add another DN to the
|
||||
// pipeline and in doing so trigger a block transfer.
|
||||
writeTestDataToFile(fs);
|
||||
assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
|
||||
}
|
||||
|
||||
private static void writeTestDataToFile(FileSystem fs) throws IOException {
|
||||
|
Loading…
Reference in New Issue
Block a user