HDFS-3440. More effectively limit stream memory consumption when reading corrupt edit logs. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1339978 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-05-18 05:28:18 +00:00
parent eae281c130
commit 7d1f97b821
9 changed files with 117 additions and 7 deletions

View File

@ -187,6 +187,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3419. Cleanup LocatedBlock. (eli) HDFS-3419. Cleanup LocatedBlock. (eli)
HDFS-3440. More effectively limit stream memory consumption when reading
corrupt edit logs (Colin Patrick McCabe via todd)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -75,7 +75,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin); tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
DataInputStream in = new DataInputStream(tracker); DataInputStream in = new DataInputStream(tracker);
reader = new FSEditLogOp.Reader(in, logVersion); reader = new FSEditLogOp.Reader(in, tracker, logVersion);
} }
@Override @Override

View File

@ -119,7 +119,7 @@ void setBytes(byte[] newBytes, int version) throws IOException {
this.version = version; this.version = version;
reader = new FSEditLogOp.Reader(in, version); reader = new FSEditLogOp.Reader(in, tracker, version);
} }
void clear() throws IOException { void clear() throws IOException {

View File

@ -83,7 +83,7 @@ public EditLogFileInputStream(File name, long firstTxId, long lastTxId,
throw new LogHeaderCorruptException("No header found in log"); throw new LogHeaderCorruptException("No header found in log");
} }
reader = new FSEditLogOp.Reader(in, logVersion); reader = new FSEditLogOp.Reader(in, tracker, logVersion);
this.firstTxId = firstTxId; this.firstTxId = firstTxId;
this.lastTxId = lastTxId; this.lastTxId = lastTxId;
this.isInProgress = isInProgress; this.isInProgress = isInProgress;

View File

@ -721,17 +721,31 @@ long getNumTransactions() {
/** /**
* Stream wrapper that keeps track of the current stream position. * Stream wrapper that keeps track of the current stream position.
*
* This stream also allows us to set a limit on how many bytes we can read
* without getting an exception.
*/ */
public static class PositionTrackingInputStream extends FilterInputStream { public static class PositionTrackingInputStream extends FilterInputStream
implements StreamLimiter {
private long curPos = 0; private long curPos = 0;
private long markPos = -1; private long markPos = -1;
private long limitPos = Long.MAX_VALUE;
public PositionTrackingInputStream(InputStream is) { public PositionTrackingInputStream(InputStream is) {
super(is); super(is);
} }
private void checkLimit(long amt) throws IOException {
long extra = (curPos + amt) - limitPos;
if (extra > 0) {
throw new IOException("Tried to read " + amt + " byte(s) past " +
"the limit at offset " + limitPos);
}
}
@Override @Override
public int read() throws IOException { public int read() throws IOException {
checkLimit(1);
int ret = super.read(); int ret = super.read();
if (ret != -1) curPos++; if (ret != -1) curPos++;
return ret; return ret;
@ -739,6 +753,7 @@ public int read() throws IOException {
@Override @Override
public int read(byte[] data) throws IOException { public int read(byte[] data) throws IOException {
checkLimit(data.length);
int ret = super.read(data); int ret = super.read(data);
if (ret > 0) curPos += ret; if (ret > 0) curPos += ret;
return ret; return ret;
@ -746,11 +761,17 @@ public int read(byte[] data) throws IOException {
@Override @Override
public int read(byte[] data, int offset, int length) throws IOException { public int read(byte[] data, int offset, int length) throws IOException {
checkLimit(length);
int ret = super.read(data, offset, length); int ret = super.read(data, offset, length);
if (ret > 0) curPos += ret; if (ret > 0) curPos += ret;
return ret; return ret;
} }
@Override
public void setLimit(long limit) {
limitPos = curPos + limit;
}
@Override @Override
public void mark(int limit) { public void mark(int limit) {
super.mark(limit); super.mark(limit);
@ -773,6 +794,11 @@ public long getPos() {
@Override @Override
public long skip(long amt) throws IOException { public long skip(long amt) throws IOException {
long extra = (curPos + amt) - limitPos;
if (extra > 0) {
throw new IOException("Tried to skip " + extra + " bytes past " +
"the limit at offset " + limitPos);
}
long ret = super.skip(amt); long ret = super.skip(amt);
curPos += ret; curPos += ret;
return ret; return ret;

View File

@ -75,7 +75,10 @@
public abstract class FSEditLogOp { public abstract class FSEditLogOp {
public final FSEditLogOpCodes opCode; public final FSEditLogOpCodes opCode;
long txid; long txid;
private static final int MAX_OP_SIZE = 100 * 1024 * 1024; /**
* Opcode size is limited to 1.5 megabytes
*/
public static final int MAX_OP_SIZE = (3 * 1024 * 1024) / 2;
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -2229,6 +2232,7 @@ public void writeOp(FSEditLogOp op) throws IOException {
*/ */
public static class Reader { public static class Reader {
private final DataInputStream in; private final DataInputStream in;
private final StreamLimiter limiter;
private final int logVersion; private final int logVersion;
private final Checksum checksum; private final Checksum checksum;
private final OpInstanceCache cache; private final OpInstanceCache cache;
@ -2239,7 +2243,7 @@ public static class Reader {
* @param logVersion The version of the data coming from the stream. * @param logVersion The version of the data coming from the stream.
*/ */
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public Reader(DataInputStream in, int logVersion) { public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
this.logVersion = logVersion; this.logVersion = logVersion;
if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) { if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
this.checksum = new PureJavaCrc32(); this.checksum = new PureJavaCrc32();
@ -2253,6 +2257,7 @@ public Reader(DataInputStream in, int logVersion) {
} else { } else {
this.in = in; this.in = in;
} }
this.limiter = limiter;
this.cache = new OpInstanceCache(); this.cache = new OpInstanceCache();
} }
@ -2272,6 +2277,7 @@ public Reader(DataInputStream in, int logVersion) {
public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException { public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
while (true) { while (true) {
try { try {
limiter.setLimit(MAX_OP_SIZE);
in.mark(MAX_OP_SIZE); in.mark(MAX_OP_SIZE);
return decodeOp(); return decodeOp();
} catch (GarbageAfterTerminatorException e) { } catch (GarbageAfterTerminatorException e) {

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
/**
* An object that allows you to set a limit on a stream. This limit
* represents the number of bytes that can be read without getting an
* exception.
*/
interface StreamLimiter {
/**
* Set a limit. Calling this function clears any existing limit.
*/
public void setLimit(long limit);
}

View File

@ -765,7 +765,7 @@ public EditLogByteInputStream(byte[] data) throws IOException {
tracker = new FSEditLogLoader.PositionTrackingInputStream(in); tracker = new FSEditLogLoader.PositionTrackingInputStream(in);
in = new DataInputStream(tracker); in = new DataInputStream(tracker);
reader = new FSEditLogOp.Reader(in, version); reader = new FSEditLogOp.Reader(in, tracker, version);
} }
@Override @Override

View File

@ -22,8 +22,10 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.BufferedInputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
@ -316,4 +318,47 @@ private static long getNonTrailerLength(File f) throws IOException {
fis.close(); fis.close();
} }
} }
@Test
public void testStreamLimiter() throws IOException {
final File LIMITER_TEST_FILE = new File(TEST_DIR, "limiter.test");
FileOutputStream fos = new FileOutputStream(LIMITER_TEST_FILE);
try {
fos.write(0x12);
fos.write(0x12);
fos.write(0x12);
} finally {
fos.close();
}
FileInputStream fin = new FileInputStream(LIMITER_TEST_FILE);
BufferedInputStream bin = new BufferedInputStream(fin);
FSEditLogLoader.PositionTrackingInputStream tracker =
new FSEditLogLoader.PositionTrackingInputStream(bin);
try {
tracker.setLimit(2);
tracker.mark(100);
tracker.read();
tracker.read();
try {
tracker.read();
fail("expected to get IOException after reading past the limit");
} catch (IOException e) {
}
tracker.reset();
tracker.mark(100);
byte arr[] = new byte[3];
try {
tracker.read(arr);
fail("expected to get IOException after reading past the limit");
} catch (IOException e) {
}
tracker.reset();
arr = new byte[2];
tracker.read(arr);
} finally {
tracker.close();
}
}
} }