MAPREDUCE-5028. Maps fail when io.sort.mb is set to high value. (kkambatl via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1457918 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-03-18 18:56:40 +00:00
parent 5ac6abe107
commit 53e7aaa6dd
4 changed files with 14 additions and 8 deletions

View File

@ -238,6 +238,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-4716. TestHsWebServicesJobsQuery.testJobsQueryStateInvalid
fails with jdk7. (tgraves via tucu)
MAPREDUCE-5028. Maps fail when io.sort.mb is set to high value.
(kkambatl via tucu)
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1165,8 +1165,9 @@ private void setEquator(int pos) {
equator = pos;
// set index prior to first entry, aligned at meta boundary
final int aligned = pos - (pos % METASIZE);
kvindex =
((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
// Cast one of the operands to long to ensure large values don't cause int
// overflow
kvindex = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
if (LOG.isInfoEnabled()) {
LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
"(" + (kvindex * 4) + ")");
@ -1183,8 +1184,9 @@ private void resetSpill() {
bufstart = bufend = e;
final int aligned = e - (e % METASIZE);
// set start/end to point to first meta record
kvstart = kvend =
((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
// Cast one of the operands to long to ensure large values don't cause int
// overflow
kvstart = kvend = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
if (LOG.isInfoEnabled()) {
LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
(kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
@ -1748,7 +1750,7 @@ public void reset(byte[] buffer, int start, int length) {
this.start = 0;
}
super.reset(this.buffer, this.start, this.length);
super.reset(this.buffer, this.start, this.length - this.start);
}
}

View File

@ -205,7 +205,8 @@ public VALUEIN next() {
if (backupStore.hasNext()) {
backupStore.next();
DataInputBuffer next = backupStore.nextValue();
buffer.reset(next.getData(), next.getPosition(), next.getLength());
buffer.reset(next.getData(), next.getPosition(), next.getLength()
- next.getPosition());
value = valueDeserializer.deserialize(value);
return value;
} else {

View File

@ -49,14 +49,14 @@ public InMemoryReader(MergeManagerImpl<K,V> merger, TaskAttemptID taskAttemptId,
buffer = data;
bufferSize = (int)fileLength;
memDataIn.reset(buffer, start, length);
memDataIn.reset(buffer, start, length - start);
this.start = start;
this.length = length;
}
@Override
public void reset(int offset) {
memDataIn.reset(buffer, start + offset, length);
memDataIn.reset(buffer, start + offset, length - start - offset);
bytesRead = offset;
eof = false;
}