HDFS-6950. Add Additional unit tests for HDFS-6581. (Contributed by Xiaoyu Yao)
This commit is contained in:
parent
a6b32a3e78
commit
762b04e994
@ -30,3 +30,6 @@
|
|||||||
HDFS-6931. Move lazily persisted replicas to finalized directory on DN
|
HDFS-6931. Move lazily persisted replicas to finalized directory on DN
|
||||||
startup. (Arpit Agarwal)
|
startup. (Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-6950. Add Additional unit tests for HDFS-6581. (Xiaoyu Yao via
|
||||||
|
Arpit Agarwal)
|
||||||
|
|
||||||
|
@ -84,6 +84,9 @@
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
|
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
||||||
|
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
@ -277,16 +280,29 @@ public static void createFile(FileSystem fs, Path fileName, long fileLen,
|
|||||||
public static void createFile(FileSystem fs, Path fileName, int bufferLen,
|
public static void createFile(FileSystem fs, Path fileName, int bufferLen,
|
||||||
long fileLen, long blockSize, short replFactor, long seed)
|
long fileLen, long blockSize, short replFactor, long seed)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
assert bufferLen > 0;
|
createFile(fs, fileName, false, bufferLen, fileLen, blockSize,
|
||||||
if (!fs.mkdirs(fileName.getParent())) {
|
replFactor, seed, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void createFile(FileSystem fs, Path fileName,
|
||||||
|
boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
|
||||||
|
short replFactor, long seed, boolean flush) throws IOException {
|
||||||
|
assert bufferLen > 0;
|
||||||
|
if (!fs.mkdirs(fileName.getParent())) {
|
||||||
throw new IOException("Mkdirs failed to create " +
|
throw new IOException("Mkdirs failed to create " +
|
||||||
fileName.getParent().toString());
|
fileName.getParent().toString());
|
||||||
}
|
}
|
||||||
FSDataOutputStream out = null;
|
FSDataOutputStream out = null;
|
||||||
try {
|
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
|
||||||
out = fs.create(fileName, true, fs.getConf()
|
createFlags.add(OVERWRITE);
|
||||||
.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
|
if (isLazyPersist) {
|
||||||
replFactor, blockSize);
|
createFlags.add(LAZY_PERSIST);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
out = fs.create(fileName, FsPermission.getFileDefault(), createFlags,
|
||||||
|
fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
|
||||||
|
replFactor, blockSize, null);
|
||||||
|
|
||||||
if (fileLen > 0) {
|
if (fileLen > 0) {
|
||||||
byte[] toWrite = new byte[bufferLen];
|
byte[] toWrite = new byte[bufferLen];
|
||||||
Random rb = new Random(seed);
|
Random rb = new Random(seed);
|
||||||
@ -294,10 +310,13 @@ public static void createFile(FileSystem fs, Path fileName, int bufferLen,
|
|||||||
while (bytesToWrite>0) {
|
while (bytesToWrite>0) {
|
||||||
rb.nextBytes(toWrite);
|
rb.nextBytes(toWrite);
|
||||||
int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen
|
int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen
|
||||||
: (int) bytesToWrite;
|
: (int) bytesToWrite;
|
||||||
|
|
||||||
out.write(toWrite, 0, bytesToWriteNext);
|
out.write(toWrite, 0, bytesToWriteNext);
|
||||||
bytesToWrite -= bytesToWriteNext;
|
bytesToWrite -= bytesToWriteNext;
|
||||||
|
}
|
||||||
|
if (flush) {
|
||||||
|
out.hsync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -16,46 +16,49 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
import static org.hamcrest.core.Is.is;
|
import com.google.common.util.concurrent.Uninterruptibles;
|
||||||
import static org.junit.Assert.*;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.*;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.StorageType;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.*;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||||
|
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
|
||||||
import org.apache.log4j.Level;
|
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
|
||||||
import org.junit.After;
|
import static org.hamcrest.core.Is.is;
|
||||||
import org.junit.Test;
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
public class TestLazyPersistFiles {
|
public class TestLazyPersistFiles {
|
||||||
public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
|
public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
|
||||||
@ -66,8 +69,10 @@ public class TestLazyPersistFiles {
|
|||||||
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
|
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final int THREADPOOL_SIZE = 10;
|
||||||
|
|
||||||
private static short REPL_FACTOR = 1;
|
private static short REPL_FACTOR = 1;
|
||||||
private static final long BLOCK_SIZE = 10485760; // 10 MB
|
private static final int BLOCK_SIZE = 10485760; // 10 MB
|
||||||
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
|
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
|
||||||
private static final long HEARTBEAT_INTERVAL_SEC = 1;
|
private static final long HEARTBEAT_INTERVAL_SEC = 1;
|
||||||
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
|
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
|
||||||
@ -161,6 +166,26 @@ public void testPlacementOnRamDisk() throws IOException {
|
|||||||
ensureFileReplicasOnStorageType(path, RAM_DISK);
|
ensureFileReplicasOnStorageType(path, RAM_DISK);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testPlacementOnSizeLimitedRamDisk() throws IOException {
|
||||||
|
startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
|
||||||
|
3 * BLOCK_SIZE -1); // 2 replicas + delta
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
||||||
|
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
|
||||||
|
|
||||||
|
makeTestFile(path1, BLOCK_SIZE, true);
|
||||||
|
makeTestFile(path2, BLOCK_SIZE, true);
|
||||||
|
|
||||||
|
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||||
|
ensureFileReplicasOnStorageType(path2, RAM_DISK);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Client tries to write LAZY_PERSIST to same DN with no RamDisk configured
|
||||||
|
* Write should default to disk. No error.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
@Test (timeout=300000)
|
@Test (timeout=300000)
|
||||||
public void testFallbackToDisk() throws IOException {
|
public void testFallbackToDisk() throws IOException {
|
||||||
startUpCluster(REPL_FACTOR, null, -1);
|
startUpCluster(REPL_FACTOR, null, -1);
|
||||||
@ -171,6 +196,59 @@ public void testFallbackToDisk() throws IOException {
|
|||||||
ensureFileReplicasOnStorageType(path, DEFAULT);
|
ensureFileReplicasOnStorageType(path, DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* File can not fit in RamDisk even with eviction
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testFallbackToDiskFull() throws IOException {
|
||||||
|
startUpCluster(REPL_FACTOR, null, BLOCK_SIZE - 1);
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
Path path = new Path("/" + METHOD_NAME + ".dat");
|
||||||
|
|
||||||
|
makeTestFile(path, BLOCK_SIZE, true);
|
||||||
|
ensureFileReplicasOnStorageType(path, DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* File partially fit in RamDisk after eviction.
|
||||||
|
* RamDisk can fit 2 blocks. Write a file with 5 blocks.
|
||||||
|
* Expect 2 blocks are on RamDisk whereas other 3 on disk.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testFallbackToDiskPartial()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
startUpCluster(REPL_FACTOR,
|
||||||
|
new StorageType[] { RAM_DISK, DEFAULT },
|
||||||
|
BLOCK_SIZE * 3 - 1);
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
Path path = new Path("/" + METHOD_NAME + ".dat");
|
||||||
|
|
||||||
|
makeTestFile(path, BLOCK_SIZE * 5, true);
|
||||||
|
|
||||||
|
// Sleep for a short time to allow the lazy writer thread to do its job
|
||||||
|
Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
||||||
|
|
||||||
|
triggerBlockReport();
|
||||||
|
|
||||||
|
int numBlocksOnRamDisk = 0;
|
||||||
|
int numBlocksOnDisk = 0;
|
||||||
|
|
||||||
|
long fileLength = client.getFileInfo(path.toString()).getLen();
|
||||||
|
LocatedBlocks locatedBlocks =
|
||||||
|
client.getLocatedBlocks(path.toString(), 0, fileLength);
|
||||||
|
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
|
||||||
|
if (locatedBlock.getStorageTypes()[0] == RAM_DISK) {
|
||||||
|
numBlocksOnRamDisk++;
|
||||||
|
}else if (locatedBlock.getStorageTypes()[0] == DEFAULT) {
|
||||||
|
numBlocksOnDisk++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertThat(numBlocksOnRamDisk, is(2));
|
||||||
|
assertThat(numBlocksOnDisk, is(3));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If the only available storage is RAM_DISK and the LAZY_PERSIST flag is not
|
* If the only available storage is RAM_DISK and the LAZY_PERSIST flag is not
|
||||||
* specified, then block placement should fail.
|
* specified, then block placement should fail.
|
||||||
@ -191,6 +269,10 @@ public void testRamDiskNotChosenByDefault() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Append to lazy persist file is denied.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
@Test (timeout=300000)
|
@Test (timeout=300000)
|
||||||
public void testAppendIsDenied() throws IOException {
|
public void testAppendIsDenied() throws IOException {
|
||||||
startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
|
startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
|
||||||
@ -216,7 +298,7 @@ public void testAppendIsDenied() throws IOException {
|
|||||||
public void testLazyPersistFilesAreDiscarded()
|
public void testLazyPersistFilesAreDiscarded()
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
startUpCluster(REPL_FACTOR,
|
startUpCluster(REPL_FACTOR,
|
||||||
new StorageType[] {RAM_DISK, DEFAULT },
|
new StorageType[] { RAM_DISK, DEFAULT },
|
||||||
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
|
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
||||||
@ -256,7 +338,7 @@ public void testLazyPersistFilesAreDiscarded()
|
|||||||
@Test (timeout=300000)
|
@Test (timeout=300000)
|
||||||
public void testLazyPersistBlocksAreSaved()
|
public void testLazyPersistBlocksAreSaved()
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
|
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1);
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
Path path = new Path("/" + METHOD_NAME + ".dat");
|
Path path = new Path("/" + METHOD_NAME + ".dat");
|
||||||
|
|
||||||
@ -302,8 +384,13 @@ public void testLazyPersistBlocksAreSaved()
|
|||||||
assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
|
assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
@Test (timeout=300000)
|
* RamDisk eviction after lazy persist to disk.
|
||||||
|
* Evicted blocks are still readable with on-disk replicas.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
public void testRamDiskEviction()
|
public void testRamDiskEviction()
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
startUpCluster(REPL_FACTOR,
|
startUpCluster(REPL_FACTOR,
|
||||||
@ -313,7 +400,8 @@ public void testRamDiskEviction()
|
|||||||
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
||||||
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
|
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
|
||||||
|
|
||||||
makeTestFile(path1, BLOCK_SIZE, true);
|
final int SEED = 0xFADED;
|
||||||
|
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
|
||||||
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||||
|
|
||||||
// Sleep for a short time to allow the lazy writer thread to do its job.
|
// Sleep for a short time to allow the lazy writer thread to do its job.
|
||||||
@ -323,15 +411,268 @@ public void testRamDiskEviction()
|
|||||||
|
|
||||||
// Create another file with a replica on RAM_DISK.
|
// Create another file with a replica on RAM_DISK.
|
||||||
makeTestFile(path2, BLOCK_SIZE, true);
|
makeTestFile(path2, BLOCK_SIZE, true);
|
||||||
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
|
triggerBlockReport();
|
||||||
Thread.sleep(10 * 1000);
|
|
||||||
|
|
||||||
// Make sure that the second file's block replica is on RAM_DISK, whereas
|
// Make sure that the second file's block replica is on RAM_DISK, whereas
|
||||||
// the original file's block replica is now on disk.
|
// the original file's block replica is now on disk.
|
||||||
ensureFileReplicasOnStorageType(path2, RAM_DISK);
|
// ensureFileReplicasOnStorageType(path2, RAM_DISK);
|
||||||
ensureFileReplicasOnStorageType(path1, DEFAULT);
|
ensureFileReplicasOnStorageType(path1, DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RamDisk eviction should not happen on blocks that are not yet
|
||||||
|
* persisted on disk.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testRamDiskEvictionBeforePersist()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
// 1 replica + delta, lazy persist interval every 50 minutes
|
||||||
|
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
|
||||||
|
(2 * BLOCK_SIZE - 1));
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
||||||
|
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
|
||||||
|
final int SEED = 0XFADED;
|
||||||
|
|
||||||
|
// Stop lazy writer to ensure block for path1 is not persisted to disk.
|
||||||
|
stopLazyWriter(cluster.getDataNodes().get(0));
|
||||||
|
|
||||||
|
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
|
||||||
|
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||||
|
|
||||||
|
// Create second file with a replica on RAM_DISK.
|
||||||
|
makeTestFile(path2, BLOCK_SIZE, true);
|
||||||
|
|
||||||
|
// Eviction should not happen for block of the first file that is not
|
||||||
|
// persisted yet.
|
||||||
|
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||||
|
ensureFileReplicasOnStorageType(path2, DEFAULT);
|
||||||
|
|
||||||
|
assert(fs.exists(path1));
|
||||||
|
assert(fs.exists(path2));
|
||||||
|
verifyReadRandomFile(path1, BLOCK_SIZE, SEED);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates lazy persisted blocks are evicted from RAM_DISK based on LRU.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testRamDiskEvictionLRU()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
|
||||||
|
(4 * BLOCK_SIZE -1)); // 3 replica + delta.
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
final int NUM_PATHS = 6;
|
||||||
|
Path paths[] = new Path[NUM_PATHS];
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_PATHS; i++) {
|
||||||
|
paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat");
|
||||||
|
}
|
||||||
|
|
||||||
|
// No eviction for the first half of files
|
||||||
|
for (int i = 0; i < NUM_PATHS/2; i++) {
|
||||||
|
makeTestFile(paths[i], BLOCK_SIZE, true);
|
||||||
|
ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lazy persist writer persists the first half of files
|
||||||
|
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
||||||
|
|
||||||
|
// Create the second half of files with eviction upon each create.
|
||||||
|
for (int i = NUM_PATHS/2; i < NUM_PATHS; i++) {
|
||||||
|
makeTestFile(paths[i], BLOCK_SIZE, true);
|
||||||
|
ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
|
||||||
|
|
||||||
|
// path[i-NUM_PATHS/2] is expected to be evicted by LRU
|
||||||
|
triggerBlockReport();
|
||||||
|
ensureFileReplicasOnStorageType(paths[i - NUM_PATHS / 2], DEFAULT);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete lazy-persist file that has not been persisted to disk.
|
||||||
|
* Memory is freed up and file is gone.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testDeleteBeforePersist()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
|
||||||
|
-1);
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
stopLazyWriter(cluster.getDataNodes().get(0));
|
||||||
|
|
||||||
|
Path path = new Path("/" + METHOD_NAME + ".dat");
|
||||||
|
makeTestFile(path, BLOCK_SIZE, true);
|
||||||
|
LocatedBlocks locatedBlocks =
|
||||||
|
ensureFileReplicasOnStorageType(path, RAM_DISK);
|
||||||
|
|
||||||
|
// Delete before persist
|
||||||
|
client.delete(path.toString(), false);
|
||||||
|
Assert.assertFalse(fs.exists(path));
|
||||||
|
|
||||||
|
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete lazy-persist file that has been persisted to disk
|
||||||
|
* Both memory blocks and disk blocks are deleted.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testDeleteAfterPersist()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1);
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
Path path = new Path("/" + METHOD_NAME + ".dat");
|
||||||
|
|
||||||
|
makeTestFile(path, BLOCK_SIZE, true);
|
||||||
|
LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
|
||||||
|
|
||||||
|
// Sleep for a short time to allow the lazy writer thread to do its job
|
||||||
|
Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
||||||
|
|
||||||
|
// Delete after persist
|
||||||
|
client.delete(path.toString(), false);
|
||||||
|
Assert.assertFalse(fs.exists(path));
|
||||||
|
|
||||||
|
triggerBlockReport();
|
||||||
|
|
||||||
|
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RAM_DISK used/free space
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testDfsUsageCreateDelete()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
|
||||||
|
5 * BLOCK_SIZE - 1); // 4 replica + delta
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
Path path = new Path("/" + METHOD_NAME + ".dat");
|
||||||
|
|
||||||
|
// Get the usage before write BLOCK_SIZE
|
||||||
|
long usedBeforeCreate = fs.getUsed();
|
||||||
|
|
||||||
|
makeTestFile(path, BLOCK_SIZE, true);
|
||||||
|
long usedAfterCreate = fs.getUsed();
|
||||||
|
|
||||||
|
assertThat(usedAfterCreate, is((long) BLOCK_SIZE));
|
||||||
|
|
||||||
|
// Sleep for a short time to allow the lazy writer thread to do its job
|
||||||
|
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
||||||
|
|
||||||
|
long usedAfterPersist = fs.getUsed();
|
||||||
|
assertThat(usedAfterPersist, is((long) BLOCK_SIZE));
|
||||||
|
|
||||||
|
// Delete after persist
|
||||||
|
client.delete(path.toString(), false);
|
||||||
|
long usedAfterDelete = fs.getUsed();
|
||||||
|
|
||||||
|
assertThat(usedBeforeCreate, is(usedAfterDelete));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Concurrent read from the same node and verify the contents.
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testConcurrentRead()
|
||||||
|
throws Exception {
|
||||||
|
startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
|
||||||
|
3 * BLOCK_SIZE -1); // 2 replicas + delta
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
final Path path1 = new Path("/" + METHOD_NAME + ".dat");
|
||||||
|
|
||||||
|
final int SEED = 0xFADED;
|
||||||
|
final int NUM_TASKS = 5;
|
||||||
|
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
|
||||||
|
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||||
|
|
||||||
|
//Read from multiple clients
|
||||||
|
final CountDownLatch latch = new CountDownLatch(NUM_TASKS);
|
||||||
|
final AtomicBoolean testFailed = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
Runnable readerRunnable = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
Assert.assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.error("readerRunnable error", e);
|
||||||
|
testFailed.set(true);
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Thread threads[] = new Thread[NUM_TASKS];
|
||||||
|
for (int i = 0; i < NUM_TASKS; i++) {
|
||||||
|
threads[i] = new Thread(readerRunnable);
|
||||||
|
threads[i].start();
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(500);
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_TASKS; i++) {
|
||||||
|
Uninterruptibles.joinUninterruptibly(threads[i]);
|
||||||
|
}
|
||||||
|
Assert.assertFalse(testFailed.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Concurrent write with eviction
|
||||||
|
* RAM_DISK can hold 9 replicas
|
||||||
|
* 4 threads each write 5 replicas
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testConcurrentWrites()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
|
||||||
|
(10 * BLOCK_SIZE -1)); // 9 replica + delta.
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
final int SEED = 0xFADED;
|
||||||
|
final int NUM_WRITERS = 4;
|
||||||
|
final int NUM_WRITER_PATHS = 5;
|
||||||
|
|
||||||
|
Path paths[][] = new Path[NUM_WRITERS][NUM_WRITER_PATHS];
|
||||||
|
for (int i = 0; i < NUM_WRITERS; i++) {
|
||||||
|
paths[i] = new Path[NUM_WRITER_PATHS];
|
||||||
|
for (int j = 0; j < NUM_WRITER_PATHS; j++) {
|
||||||
|
paths[i][j] =
|
||||||
|
new Path("/" + METHOD_NAME + ".Writer" + i + ".File." + j + ".dat");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final CountDownLatch latch = new CountDownLatch(NUM_WRITERS);
|
||||||
|
final AtomicBoolean testFailed = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
|
||||||
|
for (int i = 0; i < NUM_WRITERS; i++) {
|
||||||
|
Runnable writer = new WriterRunnable(cluster, i, paths[i], SEED, latch,
|
||||||
|
testFailed);
|
||||||
|
executor.execute(writer);
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
||||||
|
triggerBlockReport();
|
||||||
|
|
||||||
|
// Stop executor from adding new tasks to finish existing threads in queue
|
||||||
|
latch.await();
|
||||||
|
|
||||||
|
assertThat(testFailed.get(), is(false));
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout=300000)
|
@Test (timeout=300000)
|
||||||
public void testDnRestartWithSavedReplicas()
|
public void testDnRestartWithSavedReplicas()
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
@ -384,11 +725,12 @@ public void testDnRestartWithUnsavedReplicas()
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
|
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
|
||||||
* capped. If tmpfsStorageLimit < 0 then it is ignored.
|
* capped. If ramDiskStorageLimit < 0 then it is ignored.
|
||||||
*/
|
*/
|
||||||
private void startUpCluster(final int numDataNodes,
|
private void startUpCluster(final int numDataNodes,
|
||||||
final StorageType[] storageTypes,
|
final StorageType[] storageTypes,
|
||||||
final long ramDiskStorageLimit)
|
final long ramDiskStorageLimit,
|
||||||
|
final boolean useSCR)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
@ -397,11 +739,13 @@ private void startUpCluster(final int numDataNodes,
|
|||||||
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
|
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
|
||||||
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
|
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
|
||||||
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
||||||
HEARTBEAT_RECHECK_INTERVAL_MSEC);
|
HEARTBEAT_RECHECK_INTERVAL_MSEC);
|
||||||
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
||||||
LAZY_WRITER_INTERVAL_SEC);
|
LAZY_WRITER_INTERVAL_SEC);
|
||||||
|
|
||||||
REPL_FACTOR = 1; //Reset if case a test has modified the value
|
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
|
||||||
|
|
||||||
|
REPL_FACTOR = 1; //Reset in case a test has modified the value
|
||||||
|
|
||||||
cluster = new MiniDFSCluster
|
cluster = new MiniDFSCluster
|
||||||
.Builder(conf)
|
.Builder(conf)
|
||||||
@ -411,7 +755,7 @@ private void startUpCluster(final int numDataNodes,
|
|||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
client = fs.getClient();
|
client = fs.getClient();
|
||||||
|
|
||||||
// Artifically cap the storage capacity of the tmpfs volume.
|
// Artificially cap the storage capacity of the RAM_DISK volume.
|
||||||
if (ramDiskStorageLimit >= 0) {
|
if (ramDiskStorageLimit >= 0) {
|
||||||
List<? extends FsVolumeSpi> volumes =
|
List<? extends FsVolumeSpi> volumes =
|
||||||
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
|
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
|
||||||
@ -426,6 +770,13 @@ private void startUpCluster(final int numDataNodes,
|
|||||||
LOG.info("Cluster startup complete");
|
LOG.info("Cluster startup complete");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void startUpCluster(final int numDataNodes,
|
||||||
|
final StorageType[] storageTypes,
|
||||||
|
final long ramDiskStorageLimit)
|
||||||
|
throws IOException {
|
||||||
|
startUpCluster(numDataNodes, storageTypes, ramDiskStorageLimit, false);
|
||||||
|
}
|
||||||
|
|
||||||
private void makeTestFile(Path path, long length, final boolean isLazyPersist)
|
private void makeTestFile(Path path, long length, final boolean isLazyPersist)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
@ -435,9 +786,7 @@ private void makeTestFile(Path path, long length, final boolean isLazyPersist)
|
|||||||
createFlags.add(LAZY_PERSIST);
|
createFlags.add(LAZY_PERSIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
FSDataOutputStream fos = null;
|
FSDataOutputStream fos = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
fos =
|
fos =
|
||||||
fs.create(path,
|
fs.create(path,
|
||||||
@ -465,13 +814,14 @@ private void makeTestFile(Path path, long length, final boolean isLazyPersist)
|
|||||||
private LocatedBlocks ensureFileReplicasOnStorageType(
|
private LocatedBlocks ensureFileReplicasOnStorageType(
|
||||||
Path path, StorageType storageType) throws IOException {
|
Path path, StorageType storageType) throws IOException {
|
||||||
// Ensure that returned block locations returned are correct!
|
// Ensure that returned block locations returned are correct!
|
||||||
|
LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
|
||||||
|
assertThat(fs.exists(path), is(true));
|
||||||
long fileLength = client.getFileInfo(path.toString()).getLen();
|
long fileLength = client.getFileInfo(path.toString()).getLen();
|
||||||
LocatedBlocks locatedBlocks =
|
LocatedBlocks locatedBlocks =
|
||||||
client.getLocatedBlocks(path.toString(), 0, fileLength);
|
client.getLocatedBlocks(path.toString(), 0, fileLength);
|
||||||
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
|
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
|
||||||
assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
|
assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
|
||||||
}
|
}
|
||||||
|
|
||||||
return locatedBlocks;
|
return locatedBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -480,4 +830,119 @@ private void stopLazyWriter(DataNode dn) {
|
|||||||
FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
|
FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
|
||||||
((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
|
((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
|
||||||
|
long seed) throws IOException {
|
||||||
|
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
|
||||||
|
BLOCK_SIZE, REPL_FACTOR, seed, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean verifyReadRandomFile(
|
||||||
|
Path path, int fileLength, int seed) throws IOException {
|
||||||
|
byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
|
||||||
|
byte expected[] = DFSTestUtil.
|
||||||
|
calculateFileContentsFromSeed(seed, fileLength);
|
||||||
|
return Arrays.equals(contents, expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
LOG.info("Verifying replica has no saved copy after deletion.");
|
||||||
|
triggerBlockReport();
|
||||||
|
|
||||||
|
while(
|
||||||
|
DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
|
||||||
|
> 0L){
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
List<? extends FsVolumeSpi> volumes =
|
||||||
|
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
|
||||||
|
|
||||||
|
// Make sure deleted replica does not have a copy on either finalized dir of
|
||||||
|
// transient volume or finalized dir of non-transient volume
|
||||||
|
for (FsVolumeSpi v : volumes) {
|
||||||
|
FsVolumeImpl volume = (FsVolumeImpl) v;
|
||||||
|
File targetDir = (v.isTransientStorage()) ?
|
||||||
|
volume.getBlockPoolSlice(bpid).getFinalizedDir() :
|
||||||
|
volume.getBlockPoolSlice(bpid).getLazypersistDir();
|
||||||
|
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) {
|
||||||
|
|
||||||
|
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
|
||||||
|
File targetDir =
|
||||||
|
DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
|
||||||
|
|
||||||
|
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
|
||||||
|
if (blockFile.exists()) {
|
||||||
|
LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
|
||||||
|
" exists after deletion.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
File metaFile = new File(targetDir,
|
||||||
|
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
|
||||||
|
lb.getBlock().getGenerationStamp()));
|
||||||
|
if (metaFile.exists()) {
|
||||||
|
LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
|
||||||
|
" exists after deletion.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void triggerBlockReport()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
// Trigger block report to NN
|
||||||
|
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
|
||||||
|
Thread.sleep(10 * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
class WriterRunnable implements Runnable {
|
||||||
|
private final int id;
|
||||||
|
private final MiniDFSCluster cluster;
|
||||||
|
private final Path paths[];
|
||||||
|
private final int seed;
|
||||||
|
private CountDownLatch latch;
|
||||||
|
private AtomicBoolean bFail;
|
||||||
|
|
||||||
|
public WriterRunnable(MiniDFSCluster cluster, int threadIndex, Path[] paths,
|
||||||
|
int seed, CountDownLatch latch,
|
||||||
|
AtomicBoolean bFail) {
|
||||||
|
id = threadIndex;
|
||||||
|
this.cluster = cluster;
|
||||||
|
this.paths = paths;
|
||||||
|
this.seed = seed;
|
||||||
|
this.latch = latch;
|
||||||
|
this.bFail = bFail;
|
||||||
|
System.out.println("Creating Writer: " + id);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
System.out.println("Writer " + id + " starting... ");
|
||||||
|
int i = 0;
|
||||||
|
try {
|
||||||
|
for (i = 0; i < paths.length; i++) {
|
||||||
|
makeRandomTestFile(paths[i], BLOCK_SIZE, true, seed);
|
||||||
|
// eviction may faiL when all blocks are not persisted yet.
|
||||||
|
// ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
bFail.set(true);
|
||||||
|
LOG.error("Writer exception: writer id:" + id +
|
||||||
|
" testfile: " + paths[i].toString() +
|
||||||
|
" " + e);
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user