diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2b2caea754..9ec7fad283 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -99,6 +99,9 @@ Trunk (unreleased changes) HDFS-3571. Allow EditLogFileInputStream to read from a remote URL (todd) + HDFS-3510. Editlog pre-allocation is performed prior to writing edits + to avoid partial edits case disk out of space.(Collin McCabe via suresh) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java index 08a560ce12..bb181d743b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java @@ -41,13 +41,13 @@ @InterfaceAudience.Private public class EditLogFileOutputStream extends EditLogOutputStream { private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class); - public static final int PREALLOCATION_LENGTH = 1024 * 1024; + public static final int MIN_PREALLOCATION_LENGTH = 1024 * 1024; private File file; private FileOutputStream fp; // file stream for storing edit logs private FileChannel fc; // channel of the file stream for sync private EditsDoubleBuffer doubleBuf; - static ByteBuffer fill = ByteBuffer.allocateDirect(PREALLOCATION_LENGTH); + static ByteBuffer fill = ByteBuffer.allocateDirect(MIN_PREALLOCATION_LENGTH); static { fill.position(0); @@ -132,7 +132,7 @@ public void close() throws IOException { doubleBuf = null; } - // remove the last INVALID marker from transaction log. + // remove any preallocated padding bytes from the transaction log. if (fc != null && fc.isOpen()) { fc.truncate(fc.position()); fc.close(); @@ -166,7 +166,6 @@ public void abort() throws IOException { */ @Override public void setReadyToFlush() throws IOException { - doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker doubleBuf.setReadyToFlush(); } @@ -183,10 +182,9 @@ public void flushAndSync() throws IOException { LOG.info("Nothing to flush"); return; } + preallocate(); // preallocate file if necessay doubleBuf.flushTo(fp); fc.force(false); // metadata updates not needed - fc.position(fc.position() - 1); // skip back the end-of-file marker - preallocate(); // preallocate file if necessary } /** @@ -197,20 +195,27 @@ public boolean shouldForceSync() { return doubleBuf.shouldForceSync(); } - // allocate a big chunk of data private void preallocate() throws IOException { long position = fc.position(); - if (position + 4096 >= fc.size()) { - if(FSNamesystem.LOG.isDebugEnabled()) { - FSNamesystem.LOG.debug("Preallocating Edit log, current size " - + fc.size()); - } + long size = fc.size(); + int bufSize = doubleBuf.getReadyBuf().getLength(); + long need = bufSize - (size - position); + if (need <= 0) { + return; + } + long oldSize = size; + long total = 0; + long fillCapacity = fill.capacity(); + while (need > 0) { fill.position(0); - IOUtils.writeFully(fc, fill, position); - if(FSNamesystem.LOG.isDebugEnabled()) { - FSNamesystem.LOG.debug("Edit log size is now " + fc.size() + - " written " + fill.capacity() + " bytes " + " at offset " + position); - } + IOUtils.writeFully(fc, fill, size); + need -= fillCapacity; + size += fillCapacity; + total += fillCapacity; + } + if(FSNamesystem.LOG.isDebugEnabled()) { + FSNamesystem.LOG.debug("Preallocated " + total + " bytes at the end of " + + "the edit log (offset " + oldSize + ")"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java index 2aa736d638..e9d22e30bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java @@ -89,6 +89,10 @@ boolean shouldForceSync() { return bufCurrent.size() >= initBufferSize; } + DataOutputBuffer getReadyBuf() { + return bufReady; + } + DataOutputBuffer getCurrentBuf() { return bufCurrent; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index 80f637c499..c20c608355 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -2278,25 +2278,7 @@ public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) { public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException { while (true) { try { - limiter.setLimit(MAX_OP_SIZE); - in.mark(MAX_OP_SIZE); return decodeOp(); - } catch (GarbageAfterTerminatorException e) { - in.reset(); - if (!skipBrokenEdits) { - throw e; - } - // If we saw a terminator opcode followed by a long region of 0x00 or - // 0xff, we want to skip over that region, because there's nothing - // interesting there. - long numSkip = e.getNumAfterTerminator(); - try { - IOUtils.skipFully(in, numSkip); - } catch (Throwable t) { - FSImage.LOG.error("Failed to skip " + numSkip + " bytes of " + - "garbage after an OP_INVALID.", t); - return null; - } } catch (IOException e) { in.reset(); if (!skipBrokenEdits) { @@ -2325,7 +2307,6 @@ public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException { } private void verifyTerminator() throws IOException { - long off = 0; /** The end of the edit log should contain only 0x00 or 0xff bytes. * If it contains other bytes, the log itself may be corrupt. * It is important to check this; if we don't, a stray OP_INVALID byte @@ -2333,21 +2314,49 @@ private void verifyTerminator() throws IOException { * know that we had lost data. */ byte[] buf = new byte[4096]; + limiter.clearLimit(); + int numRead = -1, idx = 0; while (true) { - int numRead = in.read(buf); - if (numRead == -1) { - return; - } - for (int i = 0; i < numRead; i++, off++) { - if ((buf[i] != (byte)0) && (buf[i] != (byte)-1)) { - throw new GarbageAfterTerminatorException("Read garbage after " + - "the terminator!", off); + try { + numRead = -1; + idx = 0; + numRead = in.read(buf); + if (numRead == -1) { + return; + } + while (idx < numRead) { + if ((buf[idx] != (byte)0) && (buf[idx] != (byte)-1)) { + throw new IOException("Read extra bytes after " + + "the terminator!"); + } + idx++; + } + } finally { + // After reading each group of bytes, we reposition the mark one + // byte before the next group. Similarly, if there is an error, we + // want to reposition the mark one byte before the error + if (numRead != -1) { + in.reset(); + IOUtils.skipFully(in, idx); + in.mark(buf.length + 1); + IOUtils.skipFully(in, 1); } } } } + /** + * Read an opcode from the input stream. + * + * @return the opcode, or null on EOF. + * + * If an exception is thrown, the stream's mark will be set to the first + * problematic byte. This usually means the beginning of the opcode. + */ private FSEditLogOp decodeOp() throws IOException { + limiter.setLimit(MAX_OP_SIZE); + in.mark(MAX_OP_SIZE); + if (checksum != null) { checksum.reset(); } @@ -2534,35 +2543,4 @@ public static PermissionStatus permissionStatusFromXml(Stanza st) short mode = Short.valueOf(st.getValue("MODE")); return new PermissionStatus(username, groupname, new FsPermission(mode)); } - - /** - * Exception indicating that we found an OP_INVALID followed by some - * garbage. An OP_INVALID should signify the end of the file... if there - * is additional content after that, then the edit log is corrupt. - */ - static class GarbageAfterTerminatorException extends IOException { - private static final long serialVersionUID = 1L; - private final long numAfterTerminator; - - public GarbageAfterTerminatorException(String str, - long numAfterTerminator) { - super(str); - this.numAfterTerminator = numAfterTerminator; - } - - /** - * Get the number of bytes after the terminator at which the garbage - * appeared. - * - * So if you had an OP_INVALID followed immediately by another valid opcode, - * this would be 0. - * If you had an OP_INVALID followed by some padding bytes, followed by a - * stray byte at the end, this would be the number of padding bytes. - * - * @return numAfterTerminator - */ - public long getNumAfterTerminator() { - return numAfterTerminator; - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java index c1ca4ac09c..ead94968ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java @@ -20,105 +20,74 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; -import java.nio.channels.FileChannel; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.DU; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; +/** + * Test the EditLogFileOutputStream + */ public class TestEditLogFileOutputStream { - private final static int HEADER_LEN = 17; + private final static File TEST_DIR = + new File(System.getProperty("test.build.data", "/tmp")); private static final File TEST_EDITS = - new File(System.getProperty("test.build.data","/tmp"), - "editLogStream.dat"); + new File(TEST_DIR, "testEditLogFileOutput.log"); + final static int MIN_PREALLOCATION_LENGTH = + EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH; @Before + @After public void deleteEditsFile() { - TEST_EDITS.delete(); + if (TEST_EDITS.exists()) TEST_EDITS.delete(); } - @Test - public void testConstants() { - // Each call to FSEditLogOp#Reader#readOp can read at most MAX_OP_SIZE bytes - // before getting an exception. So we don't want to preallocate a longer - // region than MAX_OP_SIZE, because then we'd get an IOException when reading - // through the padding at the end of the file. - assertTrue(EditLogFileOutputStream.PREALLOCATION_LENGTH < - FSEditLogOp.MAX_OP_SIZE); - } - - @Test - public void testPreallocation() throws IOException { - Configuration conf = new HdfsConfiguration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) - .build(); - - final long START_TXID = 1; - StorageDirectory sd = cluster.getNameNode().getFSImage() - .getStorage().getStorageDir(0); - File editLog = NNStorage.getInProgressEditsFile(sd, START_TXID); - - EditLogValidation validation = EditLogFileInputStream.validateEditLog(editLog); - assertEquals("Edit log should contain a header as valid length", - HEADER_LEN, validation.getValidLength()); - assertEquals(validation.getEndTxId(), START_TXID); - assertEquals("Edit log should have 1MB pre-allocated, plus 4 bytes " + - "for the version number", - EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length()); - - - cluster.getFileSystem().mkdirs(new Path("/tmp"), - new FsPermission((short)777)); - - long oldLength = validation.getValidLength(); - validation = EditLogFileInputStream.validateEditLog(editLog); - assertTrue("Edit log should have more valid data after writing a txn " + - "(was: " + oldLength + " now: " + validation.getValidLength() + ")", - validation.getValidLength() > oldLength); - assertEquals(1, validation.getEndTxId() - START_TXID); - - assertEquals("Edit log should be 1MB long, plus 4 bytes for the version number", - EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length()); - // 256 blocks for the 1MB of preallocation space - assertTrue("Edit log disk space used should be at least 257 blocks", - 256 * 4096 <= new DU(editLog, conf).getUsed()); + static void flushAndCheckLength(EditLogFileOutputStream elos, + long expectedLength) throws IOException { + elos.setReadyToFlush(); + elos.flushAndSync(); + assertEquals(expectedLength, elos.getFile().length()); } + /** + * Tests writing to the EditLogFileOutputStream. Due to preallocation, the + * length of the edit log will usually be longer than its valid contents. + */ @Test - public void testClose() throws IOException { - String errorMessage = "TESTING: fc.truncate() threw IOE"; - - File testDir = new File(System.getProperty("test.build.data", "/tmp")); - assertTrue("could not create test directory", testDir.exists() || testDir.mkdirs()); - File f = new File(testDir, "edits"); - assertTrue("could not create test file", f.createNewFile()); - EditLogFileOutputStream elos = new EditLogFileOutputStream(f, 0); - - FileChannel mockFc = Mockito.spy(elos.getFileChannelForTesting()); - Mockito.doThrow(new IOException(errorMessage)).when(mockFc).truncate(Mockito.anyLong()); - elos.setFileChannelForTesting(mockFc); - + public void testRawWrites() throws IOException { + EditLogFileOutputStream elos = new EditLogFileOutputStream(TEST_EDITS, 0); try { - elos.close(); - fail("elos.close() succeeded, but should have thrown"); - } catch (IOException e) { - assertEquals("wrong IOE thrown from elos.close()", e.getMessage(), errorMessage); + byte[] small = new byte[] {1,2,3,4,5,8,7}; + elos.create(); + // The first (small) write we make extends the file by 1 MB due to + // preallocation. + elos.writeRaw(small, 0, small.length); + flushAndCheckLength(elos, MIN_PREALLOCATION_LENGTH); + // The next small write we make goes into the area that was already + // preallocated. + elos.writeRaw(small, 0, small.length); + flushAndCheckLength(elos, MIN_PREALLOCATION_LENGTH); + // Now we write enough bytes so that we exceed the minimum preallocated + // length. + final int BIG_WRITE_LENGTH = 3 * MIN_PREALLOCATION_LENGTH; + byte[] buf = new byte[4096]; + for (int i = 0; i < buf.length; i++) { + buf[i] = 0; + } + int total = BIG_WRITE_LENGTH; + while (total > 0) { + int toWrite = (total > buf.length) ? buf.length : total; + elos.writeRaw(buf, 0, toWrite); + total -= toWrite; + } + flushAndCheckLength(elos, 4 * MIN_PREALLOCATION_LENGTH); + } finally { + if (elos != null) elos.close(); } - - assertEquals("fc was not nulled when elos.close() failed", elos.getFileChannelForTesting(), null); } /** @@ -164,5 +133,4 @@ public void testEditLogFileOutputStreamAbortAbort() throws IOException { editLogStream.abort(); editLogStream.abort(); } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java index c1287e7d5d..6a1a20a34c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java @@ -135,6 +135,9 @@ static void runEditLogTest(EditLogTestSetup elts) throws IOException { } } + /** + * A test scenario for the edit log + */ private interface EditLogTestSetup { /** * Set up the edit log. @@ -156,10 +159,50 @@ abstract public void addTransactionsToLog(EditLogOutputStream elos, abstract public Set getValidTxIds(); } - private class EltsTestEmptyLog implements EditLogTestSetup { + static void padEditLog(EditLogOutputStream elos, int paddingLength) + throws IOException { + if (paddingLength <= 0) { + return; + } + byte buf[] = new byte[4096]; + for (int i = 0; i < buf.length; i++) { + buf[i] = (byte)-1; + } + int pad = paddingLength; + while (pad > 0) { + int toWrite = pad > buf.length ? buf.length : pad; + elos.writeRaw(buf, 0, toWrite); + pad -= toWrite; + } + } + + static void addDeleteOpcode(EditLogOutputStream elos, + OpInstanceCache cache) throws IOException { + DeleteOp op = DeleteOp.getInstance(cache); + op.setTransactionId(0x0); + op.setPath("/foo"); + op.setTimestamp(0); + elos.write(op); + } + + /** + * Test the scenario where we have an empty edit log. + * + * This class is also useful in testing whether we can correctly handle + * various amounts of padding bytes at the end of the log. We should be + * able to handle any amount of padding (including no padding) without + * throwing an exception. + */ + private static class EltsTestEmptyLog implements EditLogTestSetup { + private int paddingLength; + + public EltsTestEmptyLog(int paddingLength) { + this.paddingLength = paddingLength; + } + public void addTransactionsToLog(EditLogOutputStream elos, OpInstanceCache cache) throws IOException { - // do nothing + padEditLog(elos, paddingLength); } public long getLastValidTxId() { @@ -174,10 +217,65 @@ public Set getValidTxIds() { /** Test an empty edit log */ @Test(timeout=180000) public void testEmptyLog() throws IOException { - runEditLogTest(new EltsTestEmptyLog()); + runEditLogTest(new EltsTestEmptyLog(0)); + } + + /** Test an empty edit log with padding */ + @Test(timeout=180000) + public void testEmptyPaddedLog() throws IOException { + runEditLogTest(new EltsTestEmptyLog( + EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH)); } - private class EltsTestGarbageInEditLog implements EditLogTestSetup { + /** Test an empty edit log with extra-long padding */ + @Test(timeout=180000) + public void testEmptyExtraPaddedLog() throws IOException { + runEditLogTest(new EltsTestEmptyLog( + 3 * EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH)); + } + + /** + * Test the scenario where an edit log contains some padding (0xff) bytes + * followed by valid opcode data. + * + * These edit logs are corrupt, but all the opcodes should be recoverable + * with recovery mode. + */ + private static class EltsTestOpcodesAfterPadding implements EditLogTestSetup { + private int paddingLength; + + public EltsTestOpcodesAfterPadding(int paddingLength) { + this.paddingLength = paddingLength; + } + + public void addTransactionsToLog(EditLogOutputStream elos, + OpInstanceCache cache) throws IOException { + padEditLog(elos, paddingLength); + addDeleteOpcode(elos, cache); + } + + public long getLastValidTxId() { + return 0; + } + + public Set getValidTxIds() { + return Sets.newHashSet(0L); + } + } + + @Test(timeout=180000) + public void testOpcodesAfterPadding() throws IOException { + runEditLogTest(new EltsTestOpcodesAfterPadding( + EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH)); + } + + @Test(timeout=180000) + public void testOpcodesAfterExtraPadding() throws IOException { + runEditLogTest(new EltsTestOpcodesAfterPadding( + 3 * EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH)); + } + + private static class EltsTestGarbageInEditLog implements EditLogTestSetup { final private long BAD_TXID = 4; final private long MAX_TXID = 10;