HADOOP-8361. Avoid out-of-memory problems when deserializing strings. Contributed by Colin Patrick McCabe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1336945 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-05-10 23:15:53 +00:00
parent 98b00d7cc0
commit 032216a3a7
5 changed files with 81 additions and 14 deletions

View File

@ -305,6 +305,9 @@ Release 2.0.0 - UNRELEASED
HADOOP-8340. SNAPSHOT build versions should compare as less than their eventual HADOOP-8340. SNAPSHOT build versions should compare as less than their eventual
final release. (todd) final release. (todd)
HADOOP-8361. Avoid out-of-memory problems when deserializing strings.
(Colin Patrick McCabe via eli)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -254,7 +254,7 @@ public void setSymlink(final Path p) {
// Writable // Writable
////////////////////////////////////////////////// //////////////////////////////////////////////////
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
Text.writeString(out, getPath().toString()); Text.writeString(out, getPath().toString(), Text.ONE_MEGABYTE);
out.writeLong(getLen()); out.writeLong(getLen());
out.writeBoolean(isDirectory()); out.writeBoolean(isDirectory());
out.writeShort(getReplication()); out.writeShort(getReplication());
@ -262,16 +262,16 @@ public void write(DataOutput out) throws IOException {
out.writeLong(getModificationTime()); out.writeLong(getModificationTime());
out.writeLong(getAccessTime()); out.writeLong(getAccessTime());
getPermission().write(out); getPermission().write(out);
Text.writeString(out, getOwner()); Text.writeString(out, getOwner(), Text.ONE_MEGABYTE);
Text.writeString(out, getGroup()); Text.writeString(out, getGroup(), Text.ONE_MEGABYTE);
out.writeBoolean(isSymlink()); out.writeBoolean(isSymlink());
if (isSymlink()) { if (isSymlink()) {
Text.writeString(out, getSymlink().toString()); Text.writeString(out, getSymlink().toString(), Text.ONE_MEGABYTE);
} }
} }
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
String strPath = Text.readString(in); String strPath = Text.readString(in, Text.ONE_MEGABYTE);
this.path = new Path(strPath); this.path = new Path(strPath);
this.length = in.readLong(); this.length = in.readLong();
this.isdir = in.readBoolean(); this.isdir = in.readBoolean();
@ -280,10 +280,10 @@ public void readFields(DataInput in) throws IOException {
modification_time = in.readLong(); modification_time = in.readLong();
access_time = in.readLong(); access_time = in.readLong();
permission.readFields(in); permission.readFields(in);
owner = Text.readString(in); owner = Text.readString(in, Text.ONE_MEGABYTE);
group = Text.readString(in); group = Text.readString(in, Text.ONE_MEGABYTE);
if (in.readBoolean()) { if (in.readBoolean()) {
this.symlink = new Path(Text.readString(in)); this.symlink = new Path(Text.readString(in, Text.ONE_MEGABYTE));
} else { } else {
this.symlink = null; this.symlink = null;
} }

View File

@ -84,8 +84,8 @@ public PermissionStatus applyUMask(FsPermission umask) {
/** {@inheritDoc} */ /** {@inheritDoc} */
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
username = Text.readString(in); username = Text.readString(in, Text.ONE_MEGABYTE);
groupname = Text.readString(in); groupname = Text.readString(in, Text.ONE_MEGABYTE);
permission = FsPermission.read(in); permission = FsPermission.read(in);
} }
@ -110,8 +110,8 @@ public static void write(DataOutput out,
String username, String username,
String groupname, String groupname,
FsPermission permission) throws IOException { FsPermission permission) throws IOException {
Text.writeString(out, username); Text.writeString(out, username, Text.ONE_MEGABYTE);
Text.writeString(out, groupname); Text.writeString(out, groupname, Text.ONE_MEGABYTE);
permission.write(out); permission.write(out);
} }

View File

@ -53,6 +53,8 @@
public class Text extends BinaryComparable public class Text extends BinaryComparable
implements WritableComparable<BinaryComparable> { implements WritableComparable<BinaryComparable> {
static final int SHORT_STRING_MAX = 1024 * 1024;
private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY =
new ThreadLocal<CharsetEncoder>() { new ThreadLocal<CharsetEncoder>() {
protected CharsetEncoder initialValue() { protected CharsetEncoder initialValue() {
@ -412,6 +414,8 @@ public static ByteBuffer encode(String string, boolean replace)
return bytes; return bytes;
} }
static final public int ONE_MEGABYTE = 1024 * 1024;
/** Read a UTF8 encoded string from in /** Read a UTF8 encoded string from in
*/ */
public static String readString(DataInput in) throws IOException { public static String readString(DataInput in) throws IOException {
@ -420,7 +424,17 @@ public static String readString(DataInput in) throws IOException {
in.readFully(bytes, 0, length); in.readFully(bytes, 0, length);
return decode(bytes); return decode(bytes);
} }
/** Read a UTF8 encoded string with a maximum size
*/
public static String readString(DataInput in, int maxLength)
throws IOException {
int length = WritableUtils.readVIntInRange(in, 0, maxLength - 1);
byte [] bytes = new byte[length];
in.readFully(bytes, 0, length);
return decode(bytes);
}
/** Write a UTF8 encoded string to out /** Write a UTF8 encoded string to out
*/ */
public static int writeString(DataOutput out, String s) throws IOException { public static int writeString(DataOutput out, String s) throws IOException {
@ -431,6 +445,22 @@ public static int writeString(DataOutput out, String s) throws IOException {
return length; return length;
} }
/** Write a UTF8 encoded string with a maximum size to out
*/
public static int writeString(DataOutput out, String s, int maxLength)
throws IOException {
ByteBuffer bytes = encode(s);
int length = bytes.limit();
if (length >= maxLength) {
throw new IOException("string was too long to write! Expected " +
"less than " + maxLength + " bytes, but got " +
length + " bytes.");
}
WritableUtils.writeVInt(out, length);
out.write(bytes.array(), 0, length);
return length;
}
////// states for validateUTF8 ////// states for validateUTF8
private static final int LEAD_BYTE = 0; private static final int LEAD_BYTE = 0;

View File

@ -20,6 +20,7 @@
import junit.framework.TestCase; import junit.framework.TestCase;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException; import java.nio.charset.CharacterCodingException;
import java.util.Random; import java.util.Random;
@ -107,7 +108,6 @@ public void testCoding() throws Exception {
} }
} }
public void testIO() throws Exception { public void testIO() throws Exception {
DataOutputBuffer out = new DataOutputBuffer(); DataOutputBuffer out = new DataOutputBuffer();
DataInputBuffer in = new DataInputBuffer(); DataInputBuffer in = new DataInputBuffer();
@ -136,6 +136,40 @@ public void testIO() throws Exception {
assertTrue(before.equals(after2)); assertTrue(before.equals(after2));
} }
} }
public void doTestLimitedIO(String str, int strLen) throws IOException {
DataOutputBuffer out = new DataOutputBuffer();
DataInputBuffer in = new DataInputBuffer();
out.reset();
try {
Text.writeString(out, str, strLen);
fail("expected writeString to fail when told to write a string " +
"that was too long! The string was '" + str + "'");
} catch (IOException e) {
}
Text.writeString(out, str, strLen + 1);
// test that it reads correctly
in.reset(out.getData(), out.getLength());
in.mark(strLen);
String after;
try {
after = Text.readString(in, strLen);
fail("expected readString to fail when told to read a string " +
"that was too long! The string was '" + str + "'");
} catch (IOException e) {
}
in.reset();
after = Text.readString(in, strLen + 1);
assertTrue(str.equals(after));
}
public void testLimitedIO() throws Exception {
doTestLimitedIO("abcd", 4);
doTestLimitedIO("", 0);
doTestLimitedIO("1", 1);
}
public void testCompare() throws Exception { public void testCompare() throws Exception {
DataOutputBuffer out1 = new DataOutputBuffer(); DataOutputBuffer out1 = new DataOutputBuffer();