diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java index bc09b8c179..a771d96745 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java @@ -76,6 +76,7 @@ public class CryptoOutputStream extends FilterOutputStream implements private final byte[] key; private final byte[] initIV; private byte[] iv; + private boolean closeOutputStream; public CryptoOutputStream(OutputStream out, CryptoCodec codec, int bufferSize, byte[] key, byte[] iv) throws IOException { @@ -85,6 +86,13 @@ public CryptoOutputStream(OutputStream out, CryptoCodec codec, public CryptoOutputStream(OutputStream out, CryptoCodec codec, int bufferSize, byte[] key, byte[] iv, long streamOffset) throws IOException { + this(out, codec, bufferSize, key, iv, streamOffset, true); + } + + public CryptoOutputStream(OutputStream out, CryptoCodec codec, + int bufferSize, byte[] key, byte[] iv, long streamOffset, + boolean closeOutputStream) + throws IOException { super(out); CryptoStreamUtils.checkCodec(codec); this.bufferSize = CryptoStreamUtils.checkBufferSize(codec, bufferSize); @@ -95,6 +103,7 @@ public CryptoOutputStream(OutputStream out, CryptoCodec codec, inBuffer = ByteBuffer.allocateDirect(this.bufferSize); outBuffer = ByteBuffer.allocateDirect(this.bufferSize); this.streamOffset = streamOffset; + this.closeOutputStream = closeOutputStream; try { encryptor = codec.createEncryptor(); } catch (GeneralSecurityException e) { @@ -110,8 +119,14 @@ public CryptoOutputStream(OutputStream out, CryptoCodec codec, public CryptoOutputStream(OutputStream out, CryptoCodec codec, byte[] key, byte[] iv, long streamOffset) throws IOException { + this(out, codec, key, iv, streamOffset, true); + } + + public CryptoOutputStream(OutputStream out, CryptoCodec codec, + byte[] key, byte[] iv, long streamOffset, boolean closeOutputStream) + throws IOException { this(out, codec, CryptoStreamUtils.getBufferSize(codec.getConf()), - key, iv, streamOffset); + key, iv, streamOffset, closeOutputStream); } public OutputStream getWrappedStream() { @@ -221,7 +236,10 @@ public synchronized void close() throws IOException { return; } try { - super.close(); + flush(); + if (closeOutputStream) { + super.close(); + } freeBuffers(); } finally { closed = true; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java index 040fbcb879..46447d3cba 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java @@ -28,8 +28,14 @@ public class CryptoFSDataOutputStream extends FSDataOutputStream { public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec, int bufferSize, byte[] key, byte[] iv) throws IOException { + this(out, codec, bufferSize, key, iv, true); + } + + public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec, + int bufferSize, byte[] key, byte[] iv, boolean closeOutputStream) + throws IOException { super(new CryptoOutputStream(out, codec, bufferSize, key, iv, - out.getPos()), null, out.getPos()); + out.getPos(), closeOutputStream), null, out.getPos()); this.fsOut = out; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoOutputStreamClosing.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoOutputStreamClosing.java new file mode 100644 index 0000000000..39e4bb8588 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoOutputStreamClosing.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configuration; + +import org.junit.BeforeClass; +import org.junit.Test; +import static org.mockito.Mockito.*; + +/** + * To test proper closing of underlying stream of CryptoOutputStream. + */ +public class TestCryptoOutputStreamClosing { + private static CryptoCodec codec; + + @BeforeClass + public static void init() throws Exception { + codec = CryptoCodec.getInstance(new Configuration()); + } + + @Test + public void testOutputStreamClosing() throws Exception { + OutputStream outputStream = mock(OutputStream.class); + CryptoOutputStream cos = new CryptoOutputStream(outputStream, codec, + new byte[16], new byte[16], 0L, true); + cos.close(); + verify(outputStream).close(); + } + + @Test + public void testOutputStreamNotClosing() throws Exception { + OutputStream outputStream = mock(OutputStream.class); + CryptoOutputStream cos = new CryptoOutputStream(outputStream, codec, + new byte[16], new byte[16], 0L, false); + cos.close(); + verify(outputStream, never()).close(); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index a5232911de..45431e65ed 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -1588,6 +1588,7 @@ private void sortAndSpill() throws IOException, ClassNotFoundException, final long size = distanceTo(bufstart, bufend, bufvoid) + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; + FSDataOutputStream partitionOut = null; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); @@ -1608,7 +1609,7 @@ private void sortAndSpill() throws IOException, ClassNotFoundException, IFile.Writer writer = null; try { long segmentStart = out.getPos(); - FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); + partitionOut = CryptoUtils.wrapIfNecessary(job, out, false); writer = new Writer(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { @@ -1643,6 +1644,10 @@ private void sortAndSpill() throws IOException, ClassNotFoundException, // close the writer writer.close(); + if (partitionOut != out) { + partitionOut.close(); + partitionOut = null; + } // record offsets rec.startOffset = segmentStart; @@ -1671,6 +1676,9 @@ private void sortAndSpill() throws IOException, ClassNotFoundException, ++numSpills; } finally { if (out != null) out.close(); + if (partitionOut != null) { + partitionOut.close(); + } } } @@ -1683,6 +1691,7 @@ private void spillSingleRecord(final K key, final V value, int partition) throws IOException { long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; + FSDataOutputStream partitionOut = null; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); @@ -1697,7 +1706,7 @@ private void spillSingleRecord(final K key, final V value, try { long segmentStart = out.getPos(); // Create a new codec, don't care! - FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); + partitionOut = CryptoUtils.wrapIfNecessary(job, out, false); writer = new IFile.Writer(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); @@ -1709,6 +1718,10 @@ private void spillSingleRecord(final K key, final V value, mapOutputByteCounter.increment(out.getPos() - recordStart); } writer.close(); + if (partitionOut != out) { + partitionOut.close(); + partitionOut = null; + } // record offsets rec.startOffset = segmentStart; @@ -1736,6 +1749,9 @@ private void spillSingleRecord(final K key, final V value, ++numSpills; } finally { if (out != null) out.close(); + if (partitionOut != null) { + partitionOut.close(); + } } } @@ -1847,6 +1863,7 @@ private void mergeParts() throws IOException, InterruptedException, //The output stream for the final single output file FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); + FSDataOutputStream finalPartitionOut = null; if (numSpills == 0) { //create dummy files @@ -1855,10 +1872,15 @@ private void mergeParts() throws IOException, InterruptedException, try { for (int i = 0; i < partitions; i++) { long segmentStart = finalOut.getPos(); - FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut); + finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut, + false); Writer writer = new Writer(job, finalPartitionOut, keyClass, valClass, codec, null); writer.close(); + if (finalPartitionOut != finalOut) { + finalPartitionOut.close(); + finalPartitionOut = null; + } rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); @@ -1867,6 +1889,9 @@ private void mergeParts() throws IOException, InterruptedException, sr.writeToFile(finalIndexFile, job); } finally { finalOut.close(); + if (finalPartitionOut != null) { + finalPartitionOut.close(); + } } sortPhase.complete(); return; @@ -1910,7 +1935,7 @@ private void mergeParts() throws IOException, InterruptedException, //write merged output to disk long segmentStart = finalOut.getPos(); - FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut); + finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut, false); Writer writer = new Writer(job, finalPartitionOut, keyClass, valClass, codec, spilledRecordsCounter); @@ -1923,6 +1948,10 @@ private void mergeParts() throws IOException, InterruptedException, //close writer.close(); + if (finalPartitionOut != finalOut) { + finalPartitionOut.close(); + finalPartitionOut = null; + } sortPhase.startNextPhase(); @@ -1934,6 +1963,9 @@ private void mergeParts() throws IOException, InterruptedException, } spillRec.writeToFile(finalIndexFile, job); finalOut.close(); + if (finalPartitionOut != null) { + finalPartitionOut.close(); + } for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java index c4130b1b07..c05b6b0815 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java @@ -57,9 +57,9 @@ public static boolean isEncryptedSpillEnabled(Configuration conf) { /** * This method creates and initializes an IV (Initialization Vector) * - * @param conf - * @return byte[] - * @throws IOException + * @param conf configuration + * @return byte[] initialization vector + * @throws IOException exception in case of error */ public static byte[] createIV(Configuration conf) throws IOException { CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf); @@ -94,13 +94,33 @@ private static int getBufferSize(Configuration conf) { * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration * variable. * - * @param conf - * @param out - * @return FSDataOutputStream - * @throws IOException + * @param conf configuration + * @param out given output stream + * @return FSDataOutputStream encrypted output stream if encryption is + * enabled; otherwise the given output stream itself + * @throws IOException exception in case of error */ public static FSDataOutputStream wrapIfNecessary(Configuration conf, FSDataOutputStream out) throws IOException { + return wrapIfNecessary(conf, out, true); + } + + /** + * Wraps a given FSDataOutputStream with a CryptoOutputStream. The size of the + * data buffer required for the stream is specified by the + * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration + * variable. + * + * @param conf configuration + * @param out given output stream + * @param closeOutputStream flag to indicate whether closing the wrapped + * stream will close the given output stream + * @return FSDataOutputStream encrypted output stream if encryption is + * enabled; otherwise the given output stream itself + * @throws IOException exception in case of error + */ + public static FSDataOutputStream wrapIfNecessary(Configuration conf, + FSDataOutputStream out, boolean closeOutputStream) throws IOException { if (isEncryptedSpillEnabled(conf)) { out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array()); byte[] iv = createIV(conf); @@ -110,7 +130,7 @@ public static FSDataOutputStream wrapIfNecessary(Configuration conf, + Base64.encodeBase64URLSafeString(iv) + "]"); } return new CryptoFSDataOutputStream(out, CryptoCodec.getInstance(conf), - getBufferSize(conf), getEncryptionKey(), iv); + getBufferSize(conf), getEncryptionKey(), iv, closeOutputStream); } else { return out; } @@ -128,11 +148,12 @@ public static FSDataOutputStream wrapIfNecessary(Configuration conf, * LimitInputStream will ensure that the CryptoStream does not read past the * provided length from the given Input Stream. * - * @param conf - * @param in - * @param length - * @return InputStream - * @throws IOException + * @param conf configuration + * @param in given input stream + * @param length maximum number of bytes to read from the input stream + * @return InputStream encrypted input stream if encryption is + * enabled; otherwise the given input stream itself + * @throws IOException exception in case of error */ public static InputStream wrapIfNecessary(Configuration conf, InputStream in, long length) throws IOException { @@ -166,10 +187,11 @@ public static InputStream wrapIfNecessary(Configuration conf, InputStream in, * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration * variable. * - * @param conf - * @param in - * @return FSDataInputStream - * @throws IOException + * @param conf configuration + * @param in given input stream + * @return FSDataInputStream encrypted input stream if encryption is + * enabled; otherwise the given input stream itself + * @throws IOException exception in case of error */ public static FSDataInputStream wrapIfNecessary(Configuration conf, FSDataInputStream in) throws IOException {