HDFS-7746. Add a test randomly mixing append, truncate and snapshot operations.

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-03-05 10:21:29 +08:00
parent 22426a1c9f
commit ded0200e9c
2 changed files with 481 additions and 0 deletions

View File

@ -709,6 +709,9 @@ Release 2.7.0 - UNRELEASED
HDFS-1522. Combine two BLOCK_FILE_PREFIX constants into one. HDFS-1522. Combine two BLOCK_FILE_PREFIX constants into one.
(Dongming Liang via shv) (Dongming Liang via shv)
HDFS-7746. Add a test randomly mixing append, truncate and snapshot
operations. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode. HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -0,0 +1,478 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.base.Preconditions;
/**
* Test randomly mixing append, snapshot and truncate operations.
* Use local file system to simulate the each operation and verify
* the correctness.
*/
public class TestAppendSnapshotTruncate {
static {
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL);
}
private static final Log LOG = LogFactory.getLog(TestAppendSnapshotTruncate.class);
private static final int BLOCK_SIZE = 1024;
private static final int DATANODE_NUM = 3;
private static final short REPLICATION = 3;
static final int SHORT_HEARTBEAT = 1;
static final String[] EMPTY_STRINGS = {};
static Configuration conf;
static MiniDFSCluster cluster;
static DistributedFileSystem dfs;
@BeforeClass
public static void startUp() throws IOException {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, SHORT_HEARTBEAT);
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf)
.format(true)
.numDataNodes(DATANODE_NUM)
.nameNodePort(NameNode.DEFAULT_PORT)
.waitSafeMode(true)
.build();
dfs = cluster.getFileSystem();
}
@AfterClass
public static void tearDown() throws IOException {
if(dfs != null) {
dfs.close();
}
if(cluster != null) {
cluster.shutdown();
}
}
/** Test randomly mixing append, snapshot and truncate operations. */
@Test
public void testAST() throws Exception {
final String dirPathString = "/dir";
final Path dir = new Path(dirPathString);
dfs.mkdirs(dir);
dfs.allowSnapshot(dir);
final File localDir = new File(
System.getProperty("test.build.data", "target/test/data")
+ dirPathString);
if (localDir.exists()) {
FileUtil.fullyDelete(localDir);
}
localDir.mkdirs();
final DirWorker w = new DirWorker(dir, localDir, 3);
w.startAllFiles();
w.start();
Worker.sleep(10L*1000);
w.stop();
w.stoptAllFiles();
w.checkEverything();
}
static final FileFilter FILE_ONLY = new FileFilter() {
@Override
public boolean accept(File f) {
return f.isFile();
}
};
static class DirWorker extends Worker {
final Path dir;
final File localDir;
final FileWorker[] files;
private Map<String, Path> snapshotPaths = new HashMap<String, Path>();
private AtomicInteger snapshotCount = new AtomicInteger();
DirWorker(Path dir, File localDir, int nFiles) throws IOException {
super(dir.getName());
this.dir = dir;
this.localDir = localDir;
this.files = new FileWorker[nFiles];
for(int i = 0; i < files.length; i++) {
files[i] = new FileWorker(dir, localDir, String.format("file%02d", i));
}
}
static String getSnapshotName(int n) {
return String.format("s%02d", n);
}
String createSnapshot(String snapshot) throws IOException {
final StringBuilder b = new StringBuilder("createSnapshot: ")
.append(snapshot).append(" for ").append(dir);
{
//copy all local files to a sub dir to simulate snapshot.
final File subDir = new File(localDir, snapshot);
Assert.assertFalse(subDir.exists());
subDir.mkdir();
for(File f : localDir.listFiles(FILE_ONLY)) {
FileUtils.copyFile(f, new File(subDir, f.getName()));
}
}
final Path p = dfs.createSnapshot(dir, snapshot);
snapshotPaths.put(snapshot, p);
return b.toString();
}
String checkSnapshot(String snapshot) throws IOException {
final StringBuilder b = new StringBuilder("checkSnapshot: ")
.append(snapshot);
final File subDir = new File(localDir, snapshot);
Assert.assertTrue(subDir.exists());
final File[] localFiles = subDir.listFiles(FILE_ONLY);
final Path p = snapshotPaths.get(snapshot);
final FileStatus[] statuses = dfs.listStatus(p);
Assert.assertEquals(localFiles.length, statuses.length);
b.append(p).append(" vs ").append(subDir).append(", ")
.append(statuses.length).append(" entries");
Arrays.sort(localFiles);
Arrays.sort(statuses);
for(int i = 0; i < statuses.length; i++) {
FileWorker.checkFullFile(statuses[i].getPath(), localFiles[i]);
}
return b.toString();
}
String deleteSnapshot(String snapshot) throws IOException {
final StringBuilder b = new StringBuilder("deleteSnapshot: ")
.append(snapshot).append(" from ").append(dir);
FileUtil.fullyDelete(new File(localDir, snapshot));
dfs.deleteSnapshot(dir, snapshot);
snapshotPaths.remove(snapshot);
return b.toString();
}
@Override
public String call() throws Exception {
final Random r = DFSUtil.getRandom();
final int op = r.nextInt(6);
if (op <= 1) {
pauseAllFiles();
try {
final String snapshot = getSnapshotName(snapshotCount.getAndIncrement());
return createSnapshot(snapshot);
} finally {
startAllFiles();
}
} else if (op <= 3) {
final String[] keys = snapshotPaths.keySet().toArray(EMPTY_STRINGS);
if (keys.length == 0) {
return "NO-OP";
}
final String snapshot = keys[r.nextInt(keys.length)];
final String s = checkSnapshot(snapshot);
if (op == 2) {
return deleteSnapshot(snapshot);
}
return s;
} else {
return "NO-OP";
}
}
void pauseAllFiles() {
for(FileWorker f : files) {
f.pause();
}
for(int i = 0; i < files.length; ) {
sleep(100);
for(; i < files.length && files[i].isPaused(); i++);
}
}
void startAllFiles() {
for(FileWorker f : files) {
f.start();
}
}
void stoptAllFiles() throws InterruptedException {
for(FileWorker f : files) {
f.stop();
}
}
void checkEverything() throws IOException {
LOG.info("checkEverything");
for(FileWorker f : files) {
f.checkFullFile();
Preconditions.checkState(f.state.get() != State.ERROR);
}
for(String snapshot : snapshotPaths.keySet()) {
checkSnapshot(snapshot);
}
Preconditions.checkState(state.get() != State.ERROR);
}
}
static class FileWorker extends Worker {
final Path file;
final File localFile;
FileWorker(Path dir, File localDir, String filename) throws IOException {
super(filename);
this.file = new Path(dir, filename);
this.localFile = new File(localDir, filename);
localFile.createNewFile();
dfs.create(file, false, 4096, REPLICATION, BLOCK_SIZE).close();
}
@Override
public String call() throws IOException {
final Random r = DFSUtil.getRandom();
final int op = r.nextInt(9);
if (op == 0) {
return checkFullFile();
} else {
final int nBlocks = r.nextInt(4) + 1;
final int lastBlockSize = r.nextInt(BLOCK_SIZE) + 1;
final int nBytes = nBlocks*BLOCK_SIZE + lastBlockSize;
if (op <= 4) {
return append(nBytes);
} else if (op <= 6) {
return truncateArbitrarily(nBytes);
} else {
return truncateToBlockBoundary(nBlocks);
}
}
}
String append(int n) throws IOException {
final StringBuilder b = new StringBuilder("append ")
.append(n).append(" bytes to ").append(file.getName());
final byte[] bytes = new byte[n];
DFSUtil.getRandom().nextBytes(bytes);
{ // write to local file
final FileOutputStream out = new FileOutputStream(localFile, true);
out.write(bytes, 0, bytes.length);
out.close();
}
{
final FSDataOutputStream out = dfs.append(file);
out.write(bytes, 0, bytes.length);
out.close();
}
return b.toString();
}
String truncateArbitrarily(int nBytes) throws IOException {
Preconditions.checkArgument(nBytes > 0);
final int length = checkLength();
final StringBuilder b = new StringBuilder("truncateArbitrarily: ")
.append(nBytes).append(" bytes from ").append(file.getName())
.append(", length=" + length);
truncate(length > nBytes? length - nBytes: 0, b);
return b.toString();
}
String truncateToBlockBoundary(int nBlocks) throws IOException {
Preconditions.checkArgument(nBlocks > 0);
final int length = checkLength();
final StringBuilder b = new StringBuilder("truncateToBlockBoundary: ")
.append(nBlocks).append(" blocks from ").append(file.getName())
.append(", length=" + length);
final int n = (nBlocks - 1)*BLOCK_SIZE + (length%BLOCK_SIZE);
Preconditions.checkState(truncate(length > n? length - n: 0, b), b);
return b.toString();
}
private boolean truncate(long newLength, StringBuilder b) throws IOException {
final RandomAccessFile raf = new RandomAccessFile(localFile, "rw");
raf.setLength(newLength);
raf.close();
final boolean isReady = dfs.truncate(file, newLength);
b.append(", newLength=").append(newLength)
.append(", isReady=").append(isReady);
if (!isReady) {
TestFileTruncate.checkBlockRecovery(file, dfs);
}
return isReady;
}
int checkLength() throws IOException {
return checkLength(file, localFile);
}
static int checkLength(Path file, File localFile) throws IOException {
final long length = dfs.getFileStatus(file).getLen();
Assert.assertEquals(localFile.length(), length);
Assert.assertTrue(length <= Integer.MAX_VALUE);
return (int)length;
}
String checkFullFile() throws IOException {
return checkFullFile(file, localFile);
}
static String checkFullFile(Path file, File localFile) throws IOException {
final StringBuilder b = new StringBuilder("checkFullFile: ")
.append(file.getName()).append(" vs ").append(localFile);
final byte[] bytes = new byte[checkLength(file, localFile)];
b.append(", length=").append(bytes.length);
final FileInputStream in = new FileInputStream(localFile);
for(int n = 0; n < bytes.length; ) {
n += in.read(bytes, n, bytes.length - n);
}
in.close();
AppendTestUtil.checkFullFile(dfs, file, bytes.length, bytes,
"File content mismatch: " + b, false);
return b.toString();
}
}
static abstract class Worker implements Callable<String> {
enum State {
IDLE(false), RUNNING(false), STOPPED(true), ERROR(true);
final boolean isTerminated;
State(boolean isTerminated) {
this.isTerminated = isTerminated;
}
};
final String name;
final AtomicReference<State> state = new AtomicReference<State>(State.IDLE);
final AtomicBoolean isCalling = new AtomicBoolean();
final AtomicReference<Thread> thread = new AtomicReference<Thread>();
Worker(String name) {
this.name = name;
}
void start() {
Preconditions.checkState(state.compareAndSet(State.IDLE, State.RUNNING));
if (thread.get() == null) {
final Thread t = new Thread(null, new Runnable() {
@Override
public void run() {
final Random r = DFSUtil.getRandom();
for(State s; (s = state.get()) == State.RUNNING || s == State.IDLE;) {
if (s == State.RUNNING) {
isCalling.set(true);
try {
LOG.info(call());
} catch (Exception e) {
LOG.error("Worker " + name + " failed.", e);
state.set(State.ERROR);
return;
}
isCalling.set(false);
}
sleep(r.nextInt(100) + 50);
}
}
}, name);
Preconditions.checkState(thread.compareAndSet(null, t));
t.start();
}
}
boolean isPaused() {
return state.get() == State.IDLE && !isCalling.get();
}
void pause() {
Preconditions.checkState(state.compareAndSet(State.RUNNING, State.IDLE));
}
void stop() throws InterruptedException {
if (state.get() == State.ERROR) {
return;
}
state.set(State.STOPPED);
thread.get().join();
}
static void sleep(final long sleepTimeMs) {
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}