From 44320eed1732ea59bd9ec83009eb10e0e6f13023 Mon Sep 17 00:00:00 2001 From: Eli Collins Date: Thu, 28 Jul 2011 05:46:28 +0000 Subject: [PATCH] HDFS-2212. Refactor double-buffering code out of EditLogOutputStreams. Contributed by Todd Lipcon git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1151736 13f79535-47bb-0310-9956-ffa450edef68 --- hdfs/CHANGES.txt | 3 + .../namenode/EditLogBackupOutputStream.java | 70 +++--------- .../namenode/EditLogFileOutputStream.java | 71 +++--------- .../server/namenode/EditsDoubleBuffer.java | 105 ++++++++++++++++++ .../hdfs/server/namenode/FSEditLogOp.java | 19 +++- .../namenode/TestEditsDoubleBuffer.java | 81 ++++++++++++++ 6 files changed, 234 insertions(+), 115 deletions(-) create mode 100644 hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java create mode 100644 hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditsDoubleBuffer.java diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt index b34da73bda..2f1d6e6b03 100644 --- a/hdfs/CHANGES.txt +++ b/hdfs/CHANGES.txt @@ -616,6 +616,9 @@ Trunk (unreleased changes) HDFS-2195. Refactor StorageDirectory to not be an non-static inner class. (todd via eli) + HDFS-2212. Refactor double-buffering code out of EditLogOutputStreams. + (todd via eli) + OPTIMIZATIONS HDFS-1458. Improve checkpoint performance by avoiding unnecessary image diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java index 2685e693ee..5e9d5398ed 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java @@ -17,18 +17,14 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import java.io.DataOutputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; @@ -46,21 +42,9 @@ class EditLogBackupOutputStream extends EditLogOutputStream { private NamenodeProtocol backupNode; // RPC proxy to backup node private NamenodeRegistration bnRegistration; // backup node registration private NamenodeRegistration nnRegistration; // active node registration - private ArrayList bufCurrent; // current buffer for writing - private ArrayList bufReady; // buffer ready for flushing + private EditsDoubleBuffer doubleBuf; private DataOutputBuffer out; // serialized output sent to backup node - - private static class BufferedOp { - public final FSEditLogOpCodes opCode; - public final byte[] bytes; - - public BufferedOp(FSEditLogOpCodes opCode, byte[] bytes) { - this.opCode = opCode; - this.bytes = bytes; - } - } - EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node NamenodeRegistration nnReg) // active name-node throws IOException { @@ -78,8 +62,7 @@ public BufferedOp(FSEditLogOpCodes opCode, byte[] bytes) { Storage.LOG.error("Error connecting to: " + bnAddress, e); throw e; } - this.bufCurrent = new ArrayList(); - this.bufReady = new ArrayList(); + this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE); this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE); } @@ -95,13 +78,8 @@ public JournalType getType() { @Override // EditLogOutputStream void write(FSEditLogOp op) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream s = new DataOutputStream(baos); - FSEditLogOp.Writer w = new FSEditLogOp.Writer(s); - w.writeOp(op); - - bufCurrent.add(new BufferedOp(op.opCode, baos.toByteArray())); - } + doubleBuf.writeOp(op); + } @Override void writeRaw(byte[] bytes, int offset, int length) throws IOException { @@ -113,55 +91,37 @@ void writeRaw(byte[] bytes, int offset, int length) throws IOException { */ @Override // EditLogOutputStream void create() throws IOException { - bufCurrent.clear(); - assert bufReady.size() == 0 : "previous data is not flushed yet"; + assert doubleBuf.isFlushed() : "previous data is not flushed yet"; + this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE); } @Override // EditLogOutputStream public void close() throws IOException { // close should have been called after all pending transactions // have been flushed & synced. - int size = bufCurrent.size(); + int size = doubleBuf.countBufferedBytes(); if (size != 0) { throw new IOException("BackupEditStream has " + size + " records still to be flushed and cannot be closed."); } RPC.stopProxy(backupNode); // stop the RPC threads - bufCurrent = bufReady = null; + doubleBuf.close(); + doubleBuf = null; } @Override // EditLogOutputStream void setReadyToFlush() throws IOException { - assert bufReady.size() == 0 : "previous data is not flushed yet"; - ArrayList tmp = bufReady; - bufReady = bufCurrent; - bufCurrent = tmp; + doubleBuf.setReadyToFlush(); } @Override // EditLogOutputStream protected void flushAndSync() throws IOException { - assert out.size() == 0 : "Output buffer is not empty"; - int bufReadySize = bufReady.size(); - for(int idx = 0; idx < bufReadySize; idx++) { - BufferedOp jRec = null; - for(; idx < bufReadySize; idx++) { - jRec = bufReady.get(idx); - if(jRec.opCode.getOpCode() - >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode()) - break; // special operation should be sent in a separate call to BN - out.write(jRec.bytes, 0, jRec.bytes.length); - } - if(out.size() > 0) - send(NamenodeProtocol.JA_JOURNAL); - if(idx == bufReadySize) - break; - // operation like start journal spool or increment checkpoint time - // is a separate call to BN - out.write(jRec.bytes, 0, jRec.bytes.length); - send(jRec.opCode.getOpCode()); + // XXX: this code won't work in trunk, but it's redone + // in HDFS-1073 where it's simpler. + doubleBuf.flushTo(out); + if (out.size() > 0) { + send(NamenodeProtocol.JA_JOURNAL); } - bufReady.clear(); // erase all data in the buffer - out.reset(); // reset buffer to the start position } /** diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java index 74f5883625..44130015ad 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java @@ -24,12 +24,9 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.util.zip.Checksum; import org.apache.hadoop.hdfs.protocol.FSConstants; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Writable; import com.google.common.annotations.VisibleForTesting; @@ -43,10 +40,7 @@ class EditLogFileOutputStream extends EditLogOutputStream { private File file; private FileOutputStream fp; // file stream for storing edit logs private FileChannel fc; // channel of the file stream for sync - private DataOutputBuffer bufCurrent; // current buffer for writing - private DataOutputBuffer bufReady; // buffer ready for flushing - private FSEditLogOp.Writer writer; - final private int initBufferSize; // inital buffer size + private EditsDoubleBuffer doubleBuf; static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB static { @@ -68,10 +62,7 @@ class EditLogFileOutputStream extends EditLogOutputStream { EditLogFileOutputStream(File name, int size) throws IOException { super(); file = name; - initBufferSize = size; - bufCurrent = new DataOutputBuffer(size); - bufReady = new DataOutputBuffer(size); - writer = new FSEditLogOp.Writer(bufCurrent); + doubleBuf = new EditsDoubleBuffer(size); RandomAccessFile rp = new RandomAccessFile(name, "rw"); fp = new FileOutputStream(rp.getFD()); // open for append fc = rp.getChannel(); @@ -91,23 +82,13 @@ public JournalType getType() { /** {@inheritDoc} */ @Override void write(FSEditLogOp op) throws IOException { - int start = bufCurrent.getLength(); - - writer.writeOp(op); - - // write transaction checksum - int end = bufCurrent.getLength(); - Checksum checksum = FSEditLog.getChecksum(); - checksum.reset(); - checksum.update(bufCurrent.getData(), start, end-start); - int sum = (int)checksum.getValue(); - bufCurrent.writeInt(sum); + doubleBuf.writeOp(op); } /** {@inheritDoc} */ @Override void writeRaw(byte[] bytes, int offset, int length) throws IOException { - bufCurrent.write(bytes, offset, length); + doubleBuf.writeRaw(bytes, offset, length); } /** @@ -117,7 +98,7 @@ void writeRaw(byte[] bytes, int offset, int length) throws IOException { void create() throws IOException { fc.truncate(0); fc.position(0); - bufCurrent.writeInt(FSConstants.LAYOUT_VERSION); + doubleBuf.getCurrentBuf().writeInt(FSConstants.LAYOUT_VERSION); setReadyToFlush(); flush(); } @@ -128,23 +109,11 @@ public void close() throws IOException { // close should have been called after all pending transactions // have been flushed & synced. // if already closed, just skip - if(bufCurrent != null) - { - int bufSize = bufCurrent.size(); - if (bufSize != 0) { - throw new IOException("FSEditStream has " + bufSize - + " bytes still to be flushed and cannot " + "be closed."); - } - bufCurrent.close(); - bufCurrent = null; - writer = null; + if (doubleBuf != null) { + doubleBuf.close(); + doubleBuf = null; } - - if(bufReady != null) { - bufReady.close(); - bufReady = null; - } - + // remove the last INVALID marker from transaction log. if (fc != null && fc.isOpen()) { fc.truncate(fc.position()); @@ -156,9 +125,8 @@ public void close() throws IOException { fp = null; } } finally { - IOUtils.cleanup(FSNamesystem.LOG, bufCurrent, bufReady, fc, fp); - bufCurrent = bufReady = null; - writer = null; + IOUtils.cleanup(FSNamesystem.LOG, fc, fp); + doubleBuf = null; fc = null; fp = null; } @@ -170,12 +138,8 @@ public void close() throws IOException { */ @Override void setReadyToFlush() throws IOException { - assert bufReady.size() == 0 : "previous data is not flushed yet"; - bufCurrent.write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker - DataOutputBuffer tmp = bufReady; - bufReady = bufCurrent; - bufCurrent = tmp; - writer = new FSEditLogOp.Writer(bufCurrent); + doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker + doubleBuf.setReadyToFlush(); } /** @@ -185,8 +149,7 @@ void setReadyToFlush() throws IOException { @Override protected void flushAndSync() throws IOException { preallocate(); // preallocate file if necessary - bufReady.writeTo(fp); // write data to file - bufReady.reset(); // erase all data in the buffer + doubleBuf.flushTo(fp); fc.force(false); // metadata updates not needed because of preallocation fc.position(fc.position() - 1); // skip back the end-of-file marker } @@ -196,7 +159,7 @@ protected void flushAndSync() throws IOException { */ @Override public boolean shouldForceSync() { - return bufReady.size() >= initBufferSize; + return doubleBuf.shouldForceSync(); } /** @@ -205,8 +168,8 @@ public boolean shouldForceSync() { @Override long length() throws IOException { // file size - header size + size of both buffers - return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES + bufReady.size() - + bufCurrent.size(); + return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES + + doubleBuf.countBufferedBytes(); } // allocate a big chunk of data diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java new file mode 100644 index 0000000000..60cb9e6dba --- /dev/null +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; + +import com.google.common.base.Preconditions; + +/** + * A double-buffer for edits. New edits are written into the first buffer + * while the second is available to be flushed. Each time the double-buffer + * is flushed, the two internal buffers are swapped. This allows edits + * to progress concurrently to flushes without allocating new buffers each + * time. + */ +class EditsDoubleBuffer { + + private DataOutputBuffer bufCurrent; // current buffer for writing + private DataOutputBuffer bufReady; // buffer ready for flushing + private final int initBufferSize; + private Writer writer; + + public EditsDoubleBuffer(int defaultBufferSize) { + initBufferSize = defaultBufferSize; + bufCurrent = new DataOutputBuffer(initBufferSize); + bufReady = new DataOutputBuffer(initBufferSize); + writer = new FSEditLogOp.Writer(bufCurrent); + } + + public void writeOp(FSEditLogOp op) throws IOException { + writer.writeOp(op); + } + + void writeRaw(byte[] bytes, int offset, int length) throws IOException { + bufCurrent.write(bytes, offset, length); + } + + void close() throws IOException { + Preconditions.checkNotNull(bufCurrent); + Preconditions.checkNotNull(bufReady); + + int bufSize = bufCurrent.size(); + if (bufSize != 0) { + throw new IOException("FSEditStream has " + bufSize + + " bytes still to be flushed and cannot be closed."); + } + + IOUtils.cleanup(null, bufCurrent, bufReady); + bufCurrent = bufReady = null; + } + + void setReadyToFlush() { + assert isFlushed() : "previous data not flushed yet"; + DataOutputBuffer tmp = bufReady; + bufReady = bufCurrent; + bufCurrent = tmp; + writer = new FSEditLogOp.Writer(bufCurrent); + } + + /** + * Writes the content of the "ready" buffer to the given output stream, + * and resets it. Does not swap any buffers. + */ + void flushTo(OutputStream out) throws IOException { + bufReady.writeTo(out); // write data to file + bufReady.reset(); // erase all data in the buffer + } + + boolean shouldForceSync() { + return bufReady.size() >= initBufferSize; + } + + DataOutputBuffer getCurrentBuf() { + return bufCurrent; + } + + public boolean isFlushed() { + return bufReady.size() == 0; + } + + public int countBufferedBytes() { + return bufReady.size() + bufCurrent.size(); + } + +} diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index b50f19d9ec..fb6e65f23c 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -37,6 +37,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; @@ -1341,10 +1342,10 @@ private static long readLongWritable(DataInputStream in) throws IOException { * Class for writing editlog ops */ public static class Writer { - private final DataOutputStream out; + private final DataOutputBuffer buf; - public Writer(DataOutputStream out) { - this.out = out; + public Writer(DataOutputBuffer out) { + this.buf = out; } /** @@ -1354,9 +1355,15 @@ public Writer(DataOutputStream out) { * @throws IOException if an error occurs during writing. */ public void writeOp(FSEditLogOp op) throws IOException { - out.writeByte(op.opCode.getOpCode()); - - op.writeFields(out); + int start = buf.getLength(); + buf.writeByte(op.opCode.getOpCode()); + op.writeFields(buf); + int end = buf.getLength(); + Checksum checksum = FSEditLog.getChecksum(); + checksum.reset(); + checksum.update(buf.getData(), start, end-start); + int sum = (int)checksum.getValue(); + buf.writeInt(sum); } } diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditsDoubleBuffer.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditsDoubleBuffer.java new file mode 100644 index 0000000000..5e828b65e1 --- /dev/null +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditsDoubleBuffer.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.apache.hadoop.io.DataOutputBuffer; +import org.junit.Test; + +public class TestEditsDoubleBuffer { + @Test + public void testDoubleBuffer() throws IOException { + EditsDoubleBuffer buf = new EditsDoubleBuffer(1024); + + assertTrue(buf.isFlushed()); + byte[] data = new byte[100]; + buf.writeRaw(data, 0, data.length); + assertEquals("Should count new data correctly", + data.length, buf.countBufferedBytes()); + + assertTrue("Writing to current buffer should not affect flush state", + buf.isFlushed()); + + // Swap the buffers + buf.setReadyToFlush(); + assertEquals("Swapping buffers should still count buffered bytes", + data.length, buf.countBufferedBytes()); + assertFalse(buf.isFlushed()); + + // Flush to a stream + DataOutputBuffer outBuf = new DataOutputBuffer(); + buf.flushTo(outBuf); + assertEquals(data.length, outBuf.getLength()); + assertTrue(buf.isFlushed()); + assertEquals(0, buf.countBufferedBytes()); + + // Write some more + buf.writeRaw(data, 0, data.length); + assertEquals("Should count new data correctly", + data.length, buf.countBufferedBytes()); + buf.setReadyToFlush(); + buf.flushTo(outBuf); + + assertEquals(data.length * 2, outBuf.getLength()); + + assertEquals(0, buf.countBufferedBytes()); + + outBuf.close(); + } + + @Test + public void shouldFailToCloseWhenUnflushed() throws IOException { + EditsDoubleBuffer buf = new EditsDoubleBuffer(1024); + buf.writeRaw(new byte[1], 0, 1); + try { + buf.close(); + fail("Did not fail to close with unflushed data"); + } catch (IOException ioe) { + if (!ioe.toString().contains("still to be flushed")) { + throw ioe; + } + } + } +}