diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d9008d907d..f9541e042b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -709,6 +709,9 @@ Release 2.7.0 - UNRELEASED HDFS-1522. Combine two BLOCK_FILE_PREFIX constants into one. (Dongming Liang via shv) + HDFS-7746. Add a test randomly mixing append, truncate and snapshot + operations. (szetszwo) + OPTIMIZATIONS HDFS-7454. Reduce memory footprint for AclEntries in NameNode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendSnapshotTruncate.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendSnapshotTruncate.java new file mode 100644 index 0000000000..5c4c7b4a27 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendSnapshotTruncate.java @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.io.File; +import java.io.FileFilter; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.base.Preconditions; + +/** + * Test randomly mixing append, snapshot and truncate operations. + * Use local file system to simulate the each operation and verify + * the correctness. + */ +public class TestAppendSnapshotTruncate { + static { + GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL); + } + private static final Log LOG = LogFactory.getLog(TestAppendSnapshotTruncate.class); + private static final int BLOCK_SIZE = 1024; + private static final int DATANODE_NUM = 3; + private static final short REPLICATION = 3; + + static final int SHORT_HEARTBEAT = 1; + static final String[] EMPTY_STRINGS = {}; + + static Configuration conf; + static MiniDFSCluster cluster; + static DistributedFileSystem dfs; + + @BeforeClass + public static void startUp() throws IOException { + conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, SHORT_HEARTBEAT); + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf) + .format(true) + .numDataNodes(DATANODE_NUM) + .nameNodePort(NameNode.DEFAULT_PORT) + .waitSafeMode(true) + .build(); + dfs = cluster.getFileSystem(); + } + + @AfterClass + public static void tearDown() throws IOException { + if(dfs != null) { + dfs.close(); + } + if(cluster != null) { + cluster.shutdown(); + } + } + + + /** Test randomly mixing append, snapshot and truncate operations. */ + @Test + public void testAST() throws Exception { + final String dirPathString = "/dir"; + final Path dir = new Path(dirPathString); + dfs.mkdirs(dir); + dfs.allowSnapshot(dir); + + final File localDir = new File( + System.getProperty("test.build.data", "target/test/data") + + dirPathString); + if (localDir.exists()) { + FileUtil.fullyDelete(localDir); + } + localDir.mkdirs(); + + final DirWorker w = new DirWorker(dir, localDir, 3); + w.startAllFiles(); + w.start(); + Worker.sleep(10L*1000); + w.stop(); + w.stoptAllFiles(); + w.checkEverything(); + } + + static final FileFilter FILE_ONLY = new FileFilter() { + @Override + public boolean accept(File f) { + return f.isFile(); + } + }; + + static class DirWorker extends Worker { + final Path dir; + final File localDir; + + final FileWorker[] files; + + private Map snapshotPaths = new HashMap(); + private AtomicInteger snapshotCount = new AtomicInteger(); + + DirWorker(Path dir, File localDir, int nFiles) throws IOException { + super(dir.getName()); + this.dir = dir; + this.localDir = localDir; + + this.files = new FileWorker[nFiles]; + for(int i = 0; i < files.length; i++) { + files[i] = new FileWorker(dir, localDir, String.format("file%02d", i)); + } + } + + static String getSnapshotName(int n) { + return String.format("s%02d", n); + } + + String createSnapshot(String snapshot) throws IOException { + final StringBuilder b = new StringBuilder("createSnapshot: ") + .append(snapshot).append(" for ").append(dir); + + { + //copy all local files to a sub dir to simulate snapshot. + final File subDir = new File(localDir, snapshot); + Assert.assertFalse(subDir.exists()); + subDir.mkdir(); + + for(File f : localDir.listFiles(FILE_ONLY)) { + FileUtils.copyFile(f, new File(subDir, f.getName())); + } + } + + final Path p = dfs.createSnapshot(dir, snapshot); + snapshotPaths.put(snapshot, p); + return b.toString(); + } + + String checkSnapshot(String snapshot) throws IOException { + final StringBuilder b = new StringBuilder("checkSnapshot: ") + .append(snapshot); + + final File subDir = new File(localDir, snapshot); + Assert.assertTrue(subDir.exists()); + + final File[] localFiles = subDir.listFiles(FILE_ONLY); + final Path p = snapshotPaths.get(snapshot); + final FileStatus[] statuses = dfs.listStatus(p); + Assert.assertEquals(localFiles.length, statuses.length); + b.append(p).append(" vs ").append(subDir).append(", ") + .append(statuses.length).append(" entries"); + + Arrays.sort(localFiles); + Arrays.sort(statuses); + for(int i = 0; i < statuses.length; i++) { + FileWorker.checkFullFile(statuses[i].getPath(), localFiles[i]); + } + return b.toString(); + } + + String deleteSnapshot(String snapshot) throws IOException { + final StringBuilder b = new StringBuilder("deleteSnapshot: ") + .append(snapshot).append(" from ").append(dir); + FileUtil.fullyDelete(new File(localDir, snapshot)); + dfs.deleteSnapshot(dir, snapshot); + snapshotPaths.remove(snapshot); + return b.toString(); + } + + + @Override + public String call() throws Exception { + final Random r = DFSUtil.getRandom(); + final int op = r.nextInt(6); + if (op <= 1) { + pauseAllFiles(); + try { + final String snapshot = getSnapshotName(snapshotCount.getAndIncrement()); + return createSnapshot(snapshot); + } finally { + startAllFiles(); + } + } else if (op <= 3) { + final String[] keys = snapshotPaths.keySet().toArray(EMPTY_STRINGS); + if (keys.length == 0) { + return "NO-OP"; + } + final String snapshot = keys[r.nextInt(keys.length)]; + final String s = checkSnapshot(snapshot); + + if (op == 2) { + return deleteSnapshot(snapshot); + } + return s; + } else { + return "NO-OP"; + } + } + + void pauseAllFiles() { + for(FileWorker f : files) { + f.pause(); + } + + for(int i = 0; i < files.length; ) { + sleep(100); + for(; i < files.length && files[i].isPaused(); i++); + } + } + + void startAllFiles() { + for(FileWorker f : files) { + f.start(); + } + } + + void stoptAllFiles() throws InterruptedException { + for(FileWorker f : files) { + f.stop(); + } + } + + void checkEverything() throws IOException { + LOG.info("checkEverything"); + for(FileWorker f : files) { + f.checkFullFile(); + Preconditions.checkState(f.state.get() != State.ERROR); + } + for(String snapshot : snapshotPaths.keySet()) { + checkSnapshot(snapshot); + } + Preconditions.checkState(state.get() != State.ERROR); + } + } + + static class FileWorker extends Worker { + final Path file; + final File localFile; + + FileWorker(Path dir, File localDir, String filename) throws IOException { + super(filename); + this.file = new Path(dir, filename); + this.localFile = new File(localDir, filename); + + localFile.createNewFile(); + dfs.create(file, false, 4096, REPLICATION, BLOCK_SIZE).close(); + } + + @Override + public String call() throws IOException { + final Random r = DFSUtil.getRandom(); + final int op = r.nextInt(9); + if (op == 0) { + return checkFullFile(); + } else { + final int nBlocks = r.nextInt(4) + 1; + final int lastBlockSize = r.nextInt(BLOCK_SIZE) + 1; + final int nBytes = nBlocks*BLOCK_SIZE + lastBlockSize; + + if (op <= 4) { + return append(nBytes); + } else if (op <= 6) { + return truncateArbitrarily(nBytes); + } else { + return truncateToBlockBoundary(nBlocks); + } + } + } + + String append(int n) throws IOException { + final StringBuilder b = new StringBuilder("append ") + .append(n).append(" bytes to ").append(file.getName()); + + final byte[] bytes = new byte[n]; + DFSUtil.getRandom().nextBytes(bytes); + + { // write to local file + final FileOutputStream out = new FileOutputStream(localFile, true); + out.write(bytes, 0, bytes.length); + out.close(); + } + + { + final FSDataOutputStream out = dfs.append(file); + out.write(bytes, 0, bytes.length); + out.close(); + } + return b.toString(); + } + + String truncateArbitrarily(int nBytes) throws IOException { + Preconditions.checkArgument(nBytes > 0); + final int length = checkLength(); + final StringBuilder b = new StringBuilder("truncateArbitrarily: ") + .append(nBytes).append(" bytes from ").append(file.getName()) + .append(", length=" + length); + + truncate(length > nBytes? length - nBytes: 0, b); + return b.toString(); + } + + String truncateToBlockBoundary(int nBlocks) throws IOException { + Preconditions.checkArgument(nBlocks > 0); + final int length = checkLength(); + final StringBuilder b = new StringBuilder("truncateToBlockBoundary: ") + .append(nBlocks).append(" blocks from ").append(file.getName()) + .append(", length=" + length); + final int n = (nBlocks - 1)*BLOCK_SIZE + (length%BLOCK_SIZE); + Preconditions.checkState(truncate(length > n? length - n: 0, b), b); + return b.toString(); + } + + private boolean truncate(long newLength, StringBuilder b) throws IOException { + final RandomAccessFile raf = new RandomAccessFile(localFile, "rw"); + raf.setLength(newLength); + raf.close(); + + final boolean isReady = dfs.truncate(file, newLength); + b.append(", newLength=").append(newLength) + .append(", isReady=").append(isReady); + if (!isReady) { + TestFileTruncate.checkBlockRecovery(file, dfs); + } + return isReady; + } + + int checkLength() throws IOException { + return checkLength(file, localFile); + } + + static int checkLength(Path file, File localFile) throws IOException { + final long length = dfs.getFileStatus(file).getLen(); + Assert.assertEquals(localFile.length(), length); + Assert.assertTrue(length <= Integer.MAX_VALUE); + return (int)length; + } + + String checkFullFile() throws IOException { + return checkFullFile(file, localFile); + } + + static String checkFullFile(Path file, File localFile) throws IOException { + final StringBuilder b = new StringBuilder("checkFullFile: ") + .append(file.getName()).append(" vs ").append(localFile); + final byte[] bytes = new byte[checkLength(file, localFile)]; + b.append(", length=").append(bytes.length); + + final FileInputStream in = new FileInputStream(localFile); + for(int n = 0; n < bytes.length; ) { + n += in.read(bytes, n, bytes.length - n); + } + in.close(); + + AppendTestUtil.checkFullFile(dfs, file, bytes.length, bytes, + "File content mismatch: " + b, false); + return b.toString(); + } + } + + static abstract class Worker implements Callable { + enum State { + IDLE(false), RUNNING(false), STOPPED(true), ERROR(true); + + final boolean isTerminated; + State(boolean isTerminated) { + this.isTerminated = isTerminated; + } + }; + + final String name; + final AtomicReference state = new AtomicReference(State.IDLE); + final AtomicBoolean isCalling = new AtomicBoolean(); + final AtomicReference thread = new AtomicReference(); + + Worker(String name) { + this.name = name; + } + + void start() { + Preconditions.checkState(state.compareAndSet(State.IDLE, State.RUNNING)); + + if (thread.get() == null) { + final Thread t = new Thread(null, new Runnable() { + @Override + public void run() { + final Random r = DFSUtil.getRandom(); + for(State s; (s = state.get()) == State.RUNNING || s == State.IDLE;) { + if (s == State.RUNNING) { + isCalling.set(true); + try { + LOG.info(call()); + } catch (Exception e) { + LOG.error("Worker " + name + " failed.", e); + state.set(State.ERROR); + return; + } + isCalling.set(false); + } + sleep(r.nextInt(100) + 50); + } + } + }, name); + Preconditions.checkState(thread.compareAndSet(null, t)); + t.start(); + } + } + + boolean isPaused() { + return state.get() == State.IDLE && !isCalling.get(); + } + + void pause() { + Preconditions.checkState(state.compareAndSet(State.RUNNING, State.IDLE)); + } + + void stop() throws InterruptedException { + if (state.get() == State.ERROR) { + return; + } + + state.set(State.STOPPED); + thread.get().join(); + } + + static void sleep(final long sleepTimeMs) { + try { + Thread.sleep(sleepTimeMs); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } +}