diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1a82c6086a..0623bf3596 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -187,6 +187,9 @@ Release 2.0.1-alpha - UNRELEASED HDFS-3419. Cleanup LocatedBlock. (eli) + HDFS-3440. More effectively limit stream memory consumption when reading + corrupt edit logs (Colin Patrick McCabe via todd) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java index 9d070d9637..0baab0b141 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java @@ -75,7 +75,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream { tracker = new FSEditLogLoader.PositionTrackingInputStream(bin); DataInputStream in = new DataInputStream(tracker); - reader = new FSEditLogOp.Reader(in, logVersion); + reader = new FSEditLogOp.Reader(in, tracker, logVersion); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java index 1f514cdfc8..f91b713822 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java @@ -119,7 +119,7 @@ void setBytes(byte[] newBytes, int version) throws IOException { this.version = version; - reader = new FSEditLogOp.Reader(in, version); + reader = new FSEditLogOp.Reader(in, tracker, version); } void clear() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index 29c90e9efb..a76cc6c84b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -83,7 +83,7 @@ public EditLogFileInputStream(File name, long firstTxId, long lastTxId, throw new LogHeaderCorruptException("No header found in log"); } - reader = new FSEditLogOp.Reader(in, logVersion); + reader = new FSEditLogOp.Reader(in, tracker, logVersion); this.firstTxId = firstTxId; this.lastTxId = lastTxId; this.isInProgress = isInProgress; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index a48e5a6bd5..1f39a6a0bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -721,17 +721,31 @@ long getNumTransactions() { /** * Stream wrapper that keeps track of the current stream position. + * + * This stream also allows us to set a limit on how many bytes we can read + * without getting an exception. */ - public static class PositionTrackingInputStream extends FilterInputStream { + public static class PositionTrackingInputStream extends FilterInputStream + implements StreamLimiter { private long curPos = 0; private long markPos = -1; + private long limitPos = Long.MAX_VALUE; public PositionTrackingInputStream(InputStream is) { super(is); } + private void checkLimit(long amt) throws IOException { + long extra = (curPos + amt) - limitPos; + if (extra > 0) { + throw new IOException("Tried to read " + amt + " byte(s) past " + + "the limit at offset " + limitPos); + } + } + @Override public int read() throws IOException { + checkLimit(1); int ret = super.read(); if (ret != -1) curPos++; return ret; @@ -739,6 +753,7 @@ public int read() throws IOException { @Override public int read(byte[] data) throws IOException { + checkLimit(data.length); int ret = super.read(data); if (ret > 0) curPos += ret; return ret; @@ -746,11 +761,17 @@ public int read(byte[] data) throws IOException { @Override public int read(byte[] data, int offset, int length) throws IOException { + checkLimit(length); int ret = super.read(data, offset, length); if (ret > 0) curPos += ret; return ret; } + @Override + public void setLimit(long limit) { + limitPos = curPos + limit; + } + @Override public void mark(int limit) { super.mark(limit); @@ -773,6 +794,11 @@ public long getPos() { @Override public long skip(long amt) throws IOException { + long extra = (curPos + amt) - limitPos; + if (extra > 0) { + throw new IOException("Tried to skip " + extra + " bytes past " + + "the limit at offset " + limitPos); + } long ret = super.skip(amt); curPos += ret; return ret; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index 2eadd4b8f2..489f030e13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -75,7 +75,10 @@ public abstract class FSEditLogOp { public final FSEditLogOpCodes opCode; long txid; - private static final int MAX_OP_SIZE = 100 * 1024 * 1024; + /** + * Opcode size is limited to 1.5 megabytes + */ + public static final int MAX_OP_SIZE = (3 * 1024 * 1024) / 2; @SuppressWarnings("deprecation") @@ -2229,6 +2232,7 @@ public void writeOp(FSEditLogOp op) throws IOException { */ public static class Reader { private final DataInputStream in; + private final StreamLimiter limiter; private final int logVersion; private final Checksum checksum; private final OpInstanceCache cache; @@ -2239,7 +2243,7 @@ public static class Reader { * @param logVersion The version of the data coming from the stream. */ @SuppressWarnings("deprecation") - public Reader(DataInputStream in, int logVersion) { + public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) { this.logVersion = logVersion; if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) { this.checksum = new PureJavaCrc32(); @@ -2253,6 +2257,7 @@ public Reader(DataInputStream in, int logVersion) { } else { this.in = in; } + this.limiter = limiter; this.cache = new OpInstanceCache(); } @@ -2272,6 +2277,7 @@ public Reader(DataInputStream in, int logVersion) { public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException { while (true) { try { + limiter.setLimit(MAX_OP_SIZE); in.mark(MAX_OP_SIZE); return decodeOp(); } catch (GarbageAfterTerminatorException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StreamLimiter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StreamLimiter.java new file mode 100644 index 0000000000..97420828d4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StreamLimiter.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +/** + * An object that allows you to set a limit on a stream. This limit + * represents the number of bytes that can be read without getting an + * exception. + */ +interface StreamLimiter { + /** + * Set a limit. Calling this function clears any existing limit. + */ + public void setLimit(long limit); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index cae3b0dea8..5e869ecb35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -765,7 +765,7 @@ public EditLogByteInputStream(byte[] data) throws IOException { tracker = new FSEditLogLoader.PositionTrackingInputStream(in); in = new DataInputStream(tracker); - reader = new FSEditLogOp.Reader(in, version); + reader = new FSEditLogOp.Reader(in, tracker, version); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index 1917ddeb9a..4302534d15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -22,8 +22,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; @@ -316,4 +318,47 @@ private static long getNonTrailerLength(File f) throws IOException { fis.close(); } } + + @Test + public void testStreamLimiter() throws IOException { + final File LIMITER_TEST_FILE = new File(TEST_DIR, "limiter.test"); + + FileOutputStream fos = new FileOutputStream(LIMITER_TEST_FILE); + try { + fos.write(0x12); + fos.write(0x12); + fos.write(0x12); + } finally { + fos.close(); + } + + FileInputStream fin = new FileInputStream(LIMITER_TEST_FILE); + BufferedInputStream bin = new BufferedInputStream(fin); + FSEditLogLoader.PositionTrackingInputStream tracker = + new FSEditLogLoader.PositionTrackingInputStream(bin); + try { + tracker.setLimit(2); + tracker.mark(100); + tracker.read(); + tracker.read(); + try { + tracker.read(); + fail("expected to get IOException after reading past the limit"); + } catch (IOException e) { + } + tracker.reset(); + tracker.mark(100); + byte arr[] = new byte[3]; + try { + tracker.read(arr); + fail("expected to get IOException after reading past the limit"); + } catch (IOException e) { + } + tracker.reset(); + arr = new byte[2]; + tracker.read(arr); + } finally { + tracker.close(); + } + } }