HADOOP-8522. ResetableGzipOutputStream creates invalid gzip files when finish() and resetState() are used. Contributed by Mike Percy

This commit is contained in:
Chris Douglas 2017-11-10 16:29:36 -08:00
parent 6d201f77c7
commit 796a0d3a5c
2 changed files with 201 additions and 5 deletions

View File

@ -41,27 +41,54 @@
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class GzipCodec extends DefaultCodec { public class GzipCodec extends DefaultCodec {
/** /**
* A bridge that wraps around a DeflaterOutputStream to make it * A bridge that wraps around a DeflaterOutputStream to make it
* a CompressionOutputStream. * a CompressionOutputStream.
*/ */
@InterfaceStability.Evolving @InterfaceStability.Evolving
protected static class GzipOutputStream extends CompressorStream { protected static class GzipOutputStream extends CompressorStream {
private static class ResetableGZIPOutputStream extends GZIPOutputStream { private static class ResetableGZIPOutputStream extends GZIPOutputStream {
/**
* Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for
* details.
*/
private static final byte[] GZIP_HEADER = new byte[] {
0x1f, (byte) 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 };
private boolean reset = false;
public ResetableGZIPOutputStream(OutputStream out) throws IOException { public ResetableGZIPOutputStream(OutputStream out) throws IOException {
super(out); super(out);
} }
public void resetState() throws IOException { public synchronized void resetState() throws IOException {
def.reset(); reset = true;
} }
@Override
public synchronized void write(byte[] buf, int off, int len)
throws IOException {
if (reset) {
def.reset();
crc.reset();
out.write(GZIP_HEADER);
reset = false;
}
super.write(buf, off, len);
}
@Override
public synchronized void close() throws IOException {
reset = false;
super.close();
}
} }
public GzipOutputStream(OutputStream out) throws IOException { public GzipOutputStream(OutputStream out) throws IOException {
super(new ResetableGZIPOutputStream(out)); super(new ResetableGZIPOutputStream(out));
} }
/** /**
* Allow children types to put a different type in here. * Allow children types to put a different type in here.
* @param out the Deflater stream to use * @param out the Deflater stream to use
@ -69,7 +96,7 @@ public GzipOutputStream(OutputStream out) throws IOException {
protected GzipOutputStream(CompressorStream out) { protected GzipOutputStream(CompressorStream out) {
super(out); super(out);
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
out.close(); out.close();

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Random;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
/**
* Verify resettable compressor.
*/
public class TestGzipCodec {
private static final Logger LOG =
LoggerFactory.getLogger(TestGzipCodec.class);
private static final String DATA1 = "Dogs don't know it's not bacon!\n";
private static final String DATA2 = "It's baconnnn!!\n";
private GzipCodec codec = new GzipCodec();
@Before
public void setUp() {
codec.setConf(new Configuration(false));
}
// Test simple compression.
@Test
public void testSingleCompress() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CompressionOutputStream cmpOut = codec.createOutputStream(baos);
cmpOut.write(DATA1.getBytes(StandardCharsets.UTF_8));
cmpOut.finish();
cmpOut.close();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
GZIPInputStream cmpIn = new GZIPInputStream(bais);
byte[] buf = new byte[1024];
int len = cmpIn.read(buf);
String result = new String(buf, 0, len, StandardCharsets.UTF_8);
assertEquals("Input must match output", DATA1, result);
}
// Test multi-member gzip file created via finish(), resetState().
@Test
public void testResetCompress() throws IOException {
DataOutputBuffer dob = new DataOutputBuffer();
CompressionOutputStream cmpOut = codec.createOutputStream(dob);
cmpOut.write(DATA1.getBytes(StandardCharsets.UTF_8));
cmpOut.finish();
cmpOut.resetState();
cmpOut.write(DATA2.getBytes(StandardCharsets.UTF_8));
cmpOut.finish();
cmpOut.close();
dob.close();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), 0, dob.getLength());
CompressionInputStream cmpIn = codec.createInputStream(dib);
byte[] buf = new byte[1024];
StringBuilder result = new StringBuilder();
int len = 0;
while (true) {
len = cmpIn.read(buf);
if (len < 0) {
break;
}
result.append(new String(buf, 0, len, StandardCharsets.UTF_8));
}
assertEquals("Output must match input", DATA1 + DATA2, result.toString());
}
// ensure all necessary methods are overwritten
@Test
public void testWriteOverride() throws IOException {
Random r = new Random();
long seed = r.nextLong();
LOG.info("seed: " + seed);
r.setSeed(seed);
byte[] buf = new byte[128];
r.nextBytes(buf);
DataOutputBuffer dob = new DataOutputBuffer();
CompressionOutputStream cmpOut = codec.createOutputStream(dob);
cmpOut.write(buf);
int i = r.nextInt(128 - 10);
int l = r.nextInt(128 - i);
cmpOut.write(buf, i, l);
cmpOut.write((byte)(r.nextInt() & 0xFF));
cmpOut.close();
r.setSeed(seed);
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), 0, dob.getLength());
CompressionInputStream cmpIn = codec.createInputStream(dib);
byte[] vbuf = new byte[128];
assertEquals(128, cmpIn.read(vbuf));
assertArrayEquals(buf, vbuf);
r.nextBytes(vbuf);
int vi = r.nextInt(128 - 10);
int vl = r.nextInt(128 - vi);
assertEquals(vl, cmpIn.read(vbuf, 0, vl));
assertArrayEquals(Arrays.copyOfRange(buf, i, i + l),
Arrays.copyOf(vbuf, vl));
assertEquals(r.nextInt() & 0xFF, cmpIn.read());
assertEquals(-1, cmpIn.read());
}
// don't write a new header if no data are written after reset
@Test
public void testIdempotentResetState() throws IOException {
DataOutputBuffer dob = new DataOutputBuffer();
CompressionOutputStream cmpOut = codec.createOutputStream(dob);
cmpOut.write(DATA1.getBytes(StandardCharsets.UTF_8));
cmpOut.finish();
cmpOut.finish();
cmpOut.finish();
cmpOut.resetState();
cmpOut.resetState();
cmpOut.finish();
cmpOut.resetState();
cmpOut.close();
dob.close();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), 0, dob.getLength());
CompressionInputStream cmpIn = codec.createInputStream(dib);
byte[] buf = new byte[1024];
StringBuilder result = new StringBuilder();
int len = 0;
while (true) {
len = cmpIn.read(buf);
if (len < 0) {
break;
}
result.append(new String(buf, 0, len, StandardCharsets.UTF_8));
}
assertEquals("Output must match input", DATA1, result.toString());
}
}