From 95986dd2fb4527c43fa4c088c61fb7b4bd794d23 Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Fri, 11 Jul 2014 00:43:03 +0000 Subject: [PATCH] MAPREDUCE-5890. Support for encrypting Intermediate data and spills in local filesystem. (asuresh via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1609597 13f79535-47bb-0310-9956-ffa450edef68 --- .../CHANGES-fs-encryption.txt | 15 ++ .../org/apache/hadoop/mapred/BackupStore.java | 6 +- .../java/org/apache/hadoop/mapred/IFile.java | 21 +- .../org/apache/hadoop/mapred/MapTask.java | 29 +- .../java/org/apache/hadoop/mapred/Merger.java | 15 +- .../apache/hadoop/mapreduce/CryptoUtils.java | 199 ++++++++++++++ .../apache/hadoop/mapreduce/JobSubmitter.java | 9 +- .../apache/hadoop/mapreduce/MRJobConfig.java | 14 + .../hadoop/mapreduce/task/reduce/Fetcher.java | 12 +- .../mapreduce/task/reduce/LocalFetcher.java | 4 + .../task/reduce/MergeManagerImpl.java | 32 ++- .../task/reduce/OnDiskMapOutput.java | 3 +- .../mapreduce/task/reduce/TestMerger.java | 148 +++++++--- .../org/apache/hadoop/mapred/TestIFile.java | 11 +- .../TestMRIntermediateDataEncryption.java | 254 ++++++++++++++++++ .../apache/hadoop/mapred/TestReduceTask.java | 2 +- .../mapred/pipes/TestPipeApplication.java | 12 +- 17 files changed, 692 insertions(+), 94 deletions(-) create mode 100644 hadoop-mapreduce-project/CHANGES-fs-encryption.txt create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java diff --git a/hadoop-mapreduce-project/CHANGES-fs-encryption.txt b/hadoop-mapreduce-project/CHANGES-fs-encryption.txt new file mode 100644 index 0000000000..83fdc1ef80 --- /dev/null +++ b/hadoop-mapreduce-project/CHANGES-fs-encryption.txt @@ -0,0 +1,15 @@ +Hadoop MapReduce Change Log + +fs-encryption (Unreleased) + + INCOMPATIBLE CHANGES + + NEW FEATURES + + MAPREDUCE-5890. Support for encrypting Intermediate + data and spills in local filesystem. (asuresh via tucu) + + IMPROVEMENTS + + BUG FIXES + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java index cfcf0f2c6c..be7fe181f9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java @@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.CryptoUtils; /** * BackupStore is an utility class that is used to support @@ -572,7 +574,9 @@ private Writer createSpillFile() throws IOException { file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(), -1, conf); - return new Writer(conf, fs, file); + FSDataOutputStream out = fs.create(file); + out = CryptoUtils.wrapIfNecessary(conf, out); + return new Writer(conf, out, null, null, null, null, true); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java index a410c97557..30ebd6b8ca 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java @@ -90,13 +90,11 @@ public static class Writer { DataOutputBuffer buffer = new DataOutputBuffer(); - public Writer(Configuration conf, FileSystem fs, Path file, - Class keyClass, Class valueClass, - CompressionCodec codec, - Counters.Counter writesCounter) throws IOException { - this(conf, fs.create(file), keyClass, valueClass, codec, - writesCounter); - ownOutputStream = true; + public Writer(Configuration conf, FSDataOutputStream out, + Class keyClass, Class valueClass, + CompressionCodec codec, Counters.Counter writesCounter) + throws IOException { + this(conf, out, keyClass, valueClass, codec, writesCounter, false); } protected Writer(Counters.Counter writesCounter) { @@ -105,7 +103,8 @@ protected Writer(Counters.Counter writesCounter) { public Writer(Configuration conf, FSDataOutputStream out, Class keyClass, Class valueClass, - CompressionCodec codec, Counters.Counter writesCounter) + CompressionCodec codec, Counters.Counter writesCounter, + boolean ownOutputStream) throws IOException { this.writtenRecordsCounter = writesCounter; this.checksumOut = new IFileOutputStream(out); @@ -137,11 +136,7 @@ public Writer(Configuration conf, FSDataOutputStream out, this.valueSerializer = serializationFactory.getSerializer(valueClass); this.valueSerializer.open(buffer); } - } - - public Writer(Configuration conf, FileSystem fs, Path file) - throws IOException { - this(conf, fs, file, null, null, null, null); + this.ownOutputStream = ownOutputStream; } public void close() throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index 84fdd92cc5..b533ebe8e4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; import org.apache.hadoop.mapreduce.task.MapContextImpl; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.IndexedSorter; import org.apache.hadoop.util.Progress; @@ -1580,7 +1581,8 @@ private void sortAndSpill() throws IOException, ClassNotFoundException, IFile.Writer writer = null; try { long segmentStart = out.getPos(); - writer = new Writer(job, out, keyClass, valClass, codec, + FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); + writer = new Writer(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { // spill directly @@ -1617,8 +1619,8 @@ private void sortAndSpill() throws IOException, ClassNotFoundException, // record offsets rec.startOffset = segmentStart; - rec.rawLength = writer.getRawLength(); - rec.partLength = writer.getCompressedLength(); + rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); + rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); spillRec.putIndex(rec, i); writer = null; @@ -1668,7 +1670,8 @@ private void spillSingleRecord(final K key, final V value, try { long segmentStart = out.getPos(); // Create a new codec, don't care! - writer = new IFile.Writer(job, out, keyClass, valClass, codec, + FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); + writer = new IFile.Writer(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (i == partition) { @@ -1682,8 +1685,8 @@ private void spillSingleRecord(final K key, final V value, // record offsets rec.startOffset = segmentStart; - rec.rawLength = writer.getRawLength(); - rec.partLength = writer.getCompressedLength(); + rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); + rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); spillRec.putIndex(rec, i); writer = null; @@ -1825,12 +1828,13 @@ private void mergeParts() throws IOException, InterruptedException, try { for (int i = 0; i < partitions; i++) { long segmentStart = finalOut.getPos(); + FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut); Writer writer = - new Writer(job, finalOut, keyClass, valClass, codec, null); + new Writer(job, finalPartitionOut, keyClass, valClass, codec, null); writer.close(); rec.startOffset = segmentStart; - rec.rawLength = writer.getRawLength(); - rec.partLength = writer.getCompressedLength(); + rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); + rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); sr.putIndex(rec, i); } sr.writeToFile(finalIndexFile, job); @@ -1879,8 +1883,9 @@ private void mergeParts() throws IOException, InterruptedException, //write merged output to disk long segmentStart = finalOut.getPos(); + FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut); Writer writer = - new Writer(job, finalOut, keyClass, valClass, codec, + new Writer(job, finalPartitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); @@ -1896,8 +1901,8 @@ private void mergeParts() throws IOException, InterruptedException, // record offsets rec.startOffset = segmentStart; - rec.rawLength = writer.getRawLength(); - rec.partLength = writer.getCompressedLength(); + rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); + rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); spillRec.putIndex(rec, parts); } spillRec.writeToFile(finalIndexFile, job); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java index 9493871138..92855169c8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.ChecksumFileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; @@ -40,6 +41,7 @@ import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.util.PriorityQueue; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; @@ -298,8 +300,12 @@ public Segment(Reader reader, boolean preserve, void init(Counters.Counter readsCounter) throws IOException { if (reader == null) { FSDataInputStream in = fs.open(file); + in.seek(segmentOffset); - reader = new Reader(conf, in, segmentLength, codec, readsCounter); + in = CryptoUtils.wrapIfNecessary(conf, in); + reader = new Reader(conf, in, + segmentLength - CryptoUtils.cryptoPadding(conf), + codec, readsCounter); } if (mapOutputsCounter != null) { @@ -714,9 +720,10 @@ RawKeyValueIterator merge(Class keyClass, Class valueClass, tmpFilename.toString(), approxOutputSize, conf); - Writer writer = - new Writer(conf, fs, outputFile, keyClass, valueClass, codec, - writesCounter); + FSDataOutputStream out = fs.create(outputFile); + out = CryptoUtils.wrapIfNecessary(conf, out); + Writer writer = new Writer(conf, out, keyClass, valueClass, + codec, writesCounter, true); writeFile(this, writer, reporter, conf); writer.close(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java new file mode 100644 index 0000000000..7d8a4962c6 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CryptoCodec; +import org.apache.hadoop.crypto.CryptoInputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.crypto.CryptoFSDataInputStream; +import org.apache.hadoop.fs.crypto.CryptoFSDataOutputStream; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.io.LimitInputStream; + +/** + * This class provides utilities to make it easier to work with Cryptographic + * Streams. Specifically for dealing with encrypting intermediate data such + * MapReduce spill files. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class CryptoUtils { + + private static final Log LOG = LogFactory.getLog(CryptoUtils.class); + + public static boolean isShuffleEncrypted(Configuration conf) { + return conf.getBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, + MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA); + } + + /** + * This method creates and initializes an IV (Initialization Vector) + * + * @param conf + * @return byte[] + * @throws IOException + */ + public static byte[] createIV(Configuration conf) throws IOException { + CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf); + if (isShuffleEncrypted(conf)) { + byte[] iv = new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()]; + cryptoCodec.generateSecureRandom(iv); + return iv; + } else { + return null; + } + } + + public static int cryptoPadding(Configuration conf) { + // Sizeof(IV) + long(start-offset) + return isShuffleEncrypted(conf) ? CryptoCodec.getInstance(conf) + .getCipherSuite().getAlgorithmBlockSize() + 8 : 0; + } + + private static byte[] getEncryptionKey() throws IOException { + return TokenCache.getShuffleSecretKey(UserGroupInformation.getCurrentUser() + .getCredentials()); + } + + private static int getBufferSize(Configuration conf) { + return conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB, + MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB) * 1024; + } + + /** + * Wraps a given FSDataOutputStream with a CryptoOutputStream. The size of the + * data buffer required for the stream is specified by the + * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration + * variable. + * + * @param conf + * @param out + * @return FSDataOutputStream + * @throws IOException + */ + public static FSDataOutputStream wrapIfNecessary(Configuration conf, + FSDataOutputStream out) throws IOException { + if (isShuffleEncrypted(conf)) { + out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array()); + byte[] iv = createIV(conf); + out.write(iv); + if (LOG.isDebugEnabled()) { + LOG.debug("IV written to Stream [" + + Base64.encodeBase64URLSafeString(iv) + "]"); + } + return new CryptoFSDataOutputStream(out, CryptoCodec.getInstance(conf), + getBufferSize(conf), getEncryptionKey(), iv); + } else { + return out; + } + } + + /** + * Wraps a given InputStream with a CryptoInputStream. The size of the data + * buffer required for the stream is specified by the + * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration + * variable. + * + * If the value of 'length' is > -1, The InputStream is additionally wrapped + * in a LimitInputStream. CryptoStreams are late buffering in nature. This + * means they will always try to read ahead if they can. The LimitInputStream + * will ensure that the CryptoStream does not read past the provided length + * from the given Input Stream. + * + * @param conf + * @param in + * @param length + * @return InputStream + * @throws IOException + */ + public static InputStream wrapIfNecessary(Configuration conf, InputStream in, + long length) throws IOException { + if (isShuffleEncrypted(conf)) { + int bufferSize = getBufferSize(conf); + if (length > -1) { + in = new LimitInputStream(in, length); + } + byte[] offsetArray = new byte[8]; + IOUtils.readFully(in, offsetArray, 0, 8); + long offset = ByteBuffer.wrap(offsetArray).getLong(); + CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf); + byte[] iv = + new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()]; + IOUtils.readFully(in, iv, 0, + cryptoCodec.getCipherSuite().getAlgorithmBlockSize()); + if (LOG.isDebugEnabled()) { + LOG.debug("IV read from [" + + Base64.encodeBase64URLSafeString(iv) + "]"); + } + return new CryptoInputStream(in, cryptoCodec, bufferSize, + getEncryptionKey(), iv, offset + cryptoPadding(conf)); + } else { + return in; + } + } + + /** + * Wraps a given FSDataInputStream with a CryptoInputStream. The size of the + * data buffer required for the stream is specified by the + * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration + * variable. + * + * @param conf + * @param in + * @return FSDataInputStream + * @throws IOException + */ + public static FSDataInputStream wrapIfNecessary(Configuration conf, + FSDataInputStream in) throws IOException { + if (isShuffleEncrypted(conf)) { + CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf); + int bufferSize = getBufferSize(conf); + // Not going to be used... but still has to be read... + // Since the O/P stream always writes it.. + IOUtils.readFully(in, new byte[8], 0, 8); + byte[] iv = + new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()]; + IOUtils.readFully(in, iv, 0, + cryptoCodec.getCipherSuite().getAlgorithmBlockSize()); + if (LOG.isDebugEnabled()) { + LOG.debug("IV read from Stream [" + + Base64.encodeBase64URLSafeString(iv) + "]"); + } + return new CryptoFSDataInputStream(in, cryptoCodec, bufferSize, + getEncryptionKey(), iv); + } else { + return in; + } + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index 94e7125749..0734e7f295 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -291,7 +291,7 @@ private void copyJar(Path originalJarPath, Path submitJarFile, /** * configure the jobconf of the user with the command line options of * -libjars, -files, -archives. - * @param conf + * @param job * @throws IOException */ private void copyAndConfigureFiles(Job job, Path jobSubmitDir) @@ -376,8 +376,13 @@ JobStatus submitJobInternal(Job job, Cluster cluster) if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { + + int keyLen = CryptoUtils.isShuffleEncrypted(conf) + ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, + MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS) + : SHUFFLE_KEY_LENGTH; keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); - keyGen.init(SHUFFLE_KEY_LENGTH); + keyGen.init(keyLen); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 4795af78d2..e4ded57a49 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -762,4 +762,18 @@ public interface MRJobConfig { public static final String TASK_PREEMPTION = "mapreduce.job.preemption"; + + public static final String MR_ENCRYPTED_INTERMEDIATE_DATA = + "mapreduce.job.encrypted-intermediate-data"; + public static final boolean DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA = false; + + public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS = + "mapreduce.job.encrypted-intermediate-data-key-size-bits"; + public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS = + 128; + + public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB = + "mapreduce.job.encrypted-intermediate-data.buffer.kb"; + public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB = + 128; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 00d4764e66..20db9dc7e5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -19,6 +19,7 @@ import java.io.DataInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.ConnectException; import java.net.HttpURLConnection; import java.net.MalformedURLException; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.security.ssl.SSLFactory; import com.google.common.annotations.VisibleForTesting; @@ -65,6 +67,7 @@ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP, CONNECTION, WRONG_REDUCE} private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors"; + private final JobConf jobConf; private final Counters.Counter connectionErrs; private final Counters.Counter ioErrs; private final Counters.Counter wrongLengthErrs; @@ -104,6 +107,7 @@ public Fetcher(JobConf job, TaskAttemptID reduceId, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey shuffleKey, int id) { + this.jobConf = job; this.reporter = reporter; this.scheduler = scheduler; this.merger = merger; @@ -394,7 +398,11 @@ private TaskAttemptID[] copyMapOutput(MapHost host, return remaining.toArray(new TaskAttemptID[remaining.size()]); } - + InputStream is = input; + is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength); + compressedLength -= CryptoUtils.cryptoPadding(jobConf); + decompressedLength -= CryptoUtils.cryptoPadding(jobConf); + // Do some basic sanity verification if (!verifySanity(compressedLength, decompressedLength, forReduce, remaining, mapId)) { @@ -431,7 +439,7 @@ private TaskAttemptID[] copyMapOutput(MapHost host, LOG.info("fetcher#" + id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getDescription()); - mapOutput.shuffle(host, input, compressedLength, decompressedLength, + mapOutput.shuffle(host, is, compressedLength, decompressedLength, metrics, reporter); } catch (java.lang.InternalError e) { LOG.warn("Failed to shuffle for fetcher#"+id, e); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java index 52796524da..98256c2d65 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java @@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SpillRecord; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.CryptoUtils; /** * LocalFetcher is used by LocalJobRunner to perform a local filesystem @@ -145,6 +146,9 @@ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException { // now read the file, seek to the appropriate section, and send it. FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); + + inStream = CryptoUtils.wrapIfNecessary(job, inStream); + try { inStream.seek(ir.startOffset); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index a821e4d1b8..1fa1da0f70 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumFileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalFileSystem; @@ -54,6 +55,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.ReflectionUtils; @@ -227,6 +229,10 @@ protected MergeThread, K,V> createInMemoryMerger() { return new InMemoryMerger(this); } + protected MergeThread createOnDiskMerger() { + return new OnDiskMerger(this); + } + TaskAttemptID getReduceId() { return reduceId; } @@ -452,11 +458,10 @@ public void merge(List> inputs) throws IOException { mergeOutputSize).suffix( Task.MERGED_OUTPUT_PREFIX); - Writer writer = - new Writer(jobConf, rfs, outputPath, - (Class) jobConf.getMapOutputKeyClass(), - (Class) jobConf.getMapOutputValueClass(), - codec, null); + FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); + Writer writer = new Writer(jobConf, out, + (Class) jobConf.getMapOutputKeyClass(), + (Class) jobConf.getMapOutputValueClass(), codec, null, true); RawKeyValueIterator rIter = null; CompressAwarePath compressAwarePath; @@ -536,11 +541,12 @@ public void merge(List inputs) throws IOException { Path outputPath = localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX); - Writer writer = - new Writer(jobConf, rfs, outputPath, - (Class) jobConf.getMapOutputKeyClass(), - (Class) jobConf.getMapOutputValueClass(), - codec, null); + + FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); + Writer writer = new Writer(jobConf, out, + (Class) jobConf.getMapOutputKeyClass(), + (Class) jobConf.getMapOutputValueClass(), codec, null, true); + RawKeyValueIterator iter = null; CompressAwarePath compressAwarePath; Path tmpDir = new Path(reduceId.toString()); @@ -716,8 +722,10 @@ private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, reporter, spilledRecordsCounter, null, mergePhase); - Writer writer = new Writer(job, fs, outputPath, - keyClass, valueClass, codec, null); + + FSDataOutputStream out = CryptoUtils.wrapIfNecessary(job, fs.create(outputPath)); + Writer writer = new Writer(job, out, keyClass, valueClass, + codec, null, true); try { Merger.writeFile(rIter, writer, reporter, job); writer.close(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java index 59bb04a9de..6e0e92bd4d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java @@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; import com.google.common.annotations.VisibleForTesting; @@ -75,7 +76,7 @@ public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, this.merger = merger; this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); - disk = fs.create(tmpOutputPath); + disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); } @VisibleForTesting diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java index 1aea5004a5..c5ab420b81 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java @@ -24,14 +24,16 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.fs.FSDataInputStream; import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -51,10 +53,16 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl; +import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; import org.junit.After; @@ -63,40 +71,48 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.collect.Lists; + public class TestMerger { private Configuration conf; private JobConf jobConf; private FileSystem fs; - + @Before public void setup() throws IOException { conf = new Configuration(); jobConf = new JobConf(); fs = FileSystem.getLocal(conf); } - - @After - public void cleanup() throws IOException { - fs.delete(new Path(jobConf.getLocalDirs()[0]), true); - } - + + @Test - public void testInMemoryMerger() throws Throwable { + public void testEncryptedMerger() throws Throwable { + jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); + TokenCache.setShuffleSecretKey(new byte[16], credentials); + UserGroupInformation.getCurrentUser().addCredentials(credentials); + testInMemoryAndOnDiskMerger(); + } + + @Test + public void testInMemoryAndOnDiskMerger() throws Throwable { JobID jobId = new JobID("a", 0); - TaskAttemptID reduceId = new TaskAttemptID( + TaskAttemptID reduceId1 = new TaskAttemptID( new TaskID(jobId, TaskType.REDUCE, 0), 0); TaskAttemptID mapId1 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 1), 0); TaskAttemptID mapId2 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 2), 0); - + LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR); - + MergeManagerImpl mergeManager = new MergeManagerImpl( - reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, + reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, null, null, new Progress(), new MROutputFiles()); - + // write map outputs Map map1 = new TreeMap(); map1.put("apple", "disgusting"); @@ -113,32 +129,88 @@ public void testInMemoryMerger() throws Throwable { mapOutputBytes1.length); System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0, mapOutputBytes2.length); - + // create merger and run merge MergeThread, Text, Text> inMemoryMerger = mergeManager.createInMemoryMerger(); - List> mapOutputs = + List> mapOutputs1 = new ArrayList>(); - mapOutputs.add(mapOutput1); - mapOutputs.add(mapOutput2); - - inMemoryMerger.merge(mapOutputs); - + mapOutputs1.add(mapOutput1); + mapOutputs1.add(mapOutput2); + + inMemoryMerger.merge(mapOutputs1); + Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); - Path outPath = mergeManager.onDiskMapOutputs.iterator().next(); - + + TaskAttemptID reduceId2 = new TaskAttemptID( + new TaskID(jobId, TaskType.REDUCE, 3), 0); + TaskAttemptID mapId3 = new TaskAttemptID( + new TaskID(jobId, TaskType.MAP, 4), 0); + TaskAttemptID mapId4 = new TaskAttemptID( + new TaskID(jobId, TaskType.MAP, 5), 0); + // write map outputs + Map map3 = new TreeMap(); + map3.put("apple", "awesome"); + map3.put("carrot", "amazing"); + Map map4 = new TreeMap(); + map4.put("banana", "bla"); + byte[] mapOutputBytes3 = writeMapOutput(conf, map3); + byte[] mapOutputBytes4 = writeMapOutput(conf, map4); + InMemoryMapOutput mapOutput3 = new InMemoryMapOutput( + conf, mapId3, mergeManager, mapOutputBytes3.length, null, true); + InMemoryMapOutput mapOutput4 = new InMemoryMapOutput( + conf, mapId4, mergeManager, mapOutputBytes4.length, null, true); + System.arraycopy(mapOutputBytes3, 0, mapOutput3.getMemory(), 0, + mapOutputBytes3.length); + System.arraycopy(mapOutputBytes4, 0, mapOutput4.getMemory(), 0, + mapOutputBytes4.length); + +// // create merger and run merge + MergeThread, Text, Text> inMemoryMerger2 = + mergeManager.createInMemoryMerger(); + List> mapOutputs2 = + new ArrayList>(); + mapOutputs2.add(mapOutput3); + mapOutputs2.add(mapOutput4); + + inMemoryMerger2.merge(mapOutputs2); + + Assert.assertEquals(2, mergeManager.onDiskMapOutputs.size()); + + List paths = new ArrayList(); + Iterator iterator = mergeManager.onDiskMapOutputs.iterator(); List keys = new ArrayList(); List values = new ArrayList(); - readOnDiskMapOutput(conf, fs, outPath, keys, values); - Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot")); - Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious")); + while (iterator.hasNext()) { + CompressAwarePath next = iterator.next(); + readOnDiskMapOutput(conf, fs, next, keys, values); + paths.add(next); + } + Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot", "apple", "banana", "carrot")); + Assert.assertEquals(values, Arrays.asList("awesome", "bla", "amazing", "disgusting", "pretty good", "delicious")); + mergeManager.close(); + + mergeManager = new MergeManagerImpl( + reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, + null, null, new Progress(), new MROutputFiles()); + + MergeThread onDiskMerger = mergeManager.createOnDiskMerger(); + onDiskMerger.merge(paths); + + Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); + + keys = new ArrayList(); + values = new ArrayList(); + readOnDiskMapOutput(conf, fs, mergeManager.onDiskMapOutputs.iterator().next(), keys, values); + Assert.assertEquals(keys, Arrays.asList("apple", "apple", "banana", "banana", "carrot", "carrot")); + Assert.assertEquals(values, Arrays.asList("awesome", "disgusting", "pretty good", "bla", "amazing", "delicious")); mergeManager.close(); Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size()); Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size()); Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size()); } - + private byte[] writeMapOutput(Configuration conf, Map keysToValues) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -152,11 +224,13 @@ private byte[] writeMapOutput(Configuration conf, Map keysToValu writer.close(); return baos.toByteArray(); } - + private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path, List keys, List values) throws IOException { - IFile.Reader reader = new IFile.Reader(conf, fs, - path, null, null); + FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path)); + + IFile.Reader reader = new IFile.Reader(conf, in, + fs.getFileStatus(path).getLen(), null, null); DataInputBuffer keyBuff = new DataInputBuffer(); DataInputBuffer valueBuff = new DataInputBuffer(); Text key = new Text(); @@ -169,17 +243,17 @@ private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path, values.add(value.toString()); } } - + @Test public void testCompressed() throws IOException { testMergeShouldReturnProperProgress(getCompressedSegments()); - } - +} + @Test public void testUncompressed() throws IOException { testMergeShouldReturnProperProgress(getUncompressedSegments()); } - + @SuppressWarnings( { "deprecation", "unchecked" }) public void testMergeShouldReturnProperProgress( List> segments) throws IOException { @@ -212,7 +286,7 @@ private List> getUncompressedSegments() throws IOException { } return segments; } - + private List> getCompressedSegments() throws IOException { List> segments = new ArrayList>(); for (int i = 1; i < 1; i++) { @@ -220,7 +294,7 @@ private List> getCompressedSegments() throws IOException { } return segments; } - + private Segment getUncompressedSegment(int i) throws IOException { return new Segment(getReader(i), false); } @@ -258,7 +332,7 @@ public Boolean answer(InvocationOnMock invocation) { } }; } - + private Answer getValueAnswer(final String segmentName) { return new Answer() { int i = 0; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java index e3c7253afc..a314fc1f57 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapred; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -42,7 +44,7 @@ public void testIFileWriterWithCodec() throws Exception { DefaultCodec codec = new GzipCodec(); codec.setConf(conf); IFile.Writer writer = - new IFile.Writer(conf, rfs, path, Text.class, Text.class, + new IFile.Writer(conf, rfs.create(path), Text.class, Text.class, codec, null); writer.close(); } @@ -56,12 +58,15 @@ public void testIFileReaderWithCodec() throws Exception { Path path = new Path(new Path("build/test.ifile"), "data"); DefaultCodec codec = new GzipCodec(); codec.setConf(conf); + FSDataOutputStream out = rfs.create(path); IFile.Writer writer = - new IFile.Writer(conf, rfs, path, Text.class, Text.class, + new IFile.Writer(conf, out, Text.class, Text.class, codec, null); writer.close(); + FSDataInputStream in = rfs.open(path); IFile.Reader reader = - new IFile.Reader(conf, rfs, path, codec, null); + new IFile.Reader(conf, in, rfs.getFileStatus(path).getLen(), + codec, null); reader.close(); // test check sum diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java new file mode 100644 index 0000000000..ebc32adb9d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; + +import static org.junit.Assert.*; + +@SuppressWarnings(value={"unchecked", "deprecation"}) +/** + * This test tests the support for a merge operation in Hadoop. The input files + * are already sorted on the key. This test implements an external + * MapOutputCollector implementation that just copies the records to different + * partitions while maintaining the sort order in each partition. The Hadoop + * framework's merge on the reduce side will merge the partitions created to + * generate the final output which is sorted on the key. + */ +public class TestMRIntermediateDataEncryption { + // Where MR job's input will reside. + private static final Path INPUT_DIR = new Path("/test/input"); + // Where output goes. + private static final Path OUTPUT = new Path("/test/output"); + + @Test + public void testSingleReducer() throws Exception { + doEncryptionTest(3, 1, 2); + } + + @Test + public void testMultipleMapsPerNode() throws Exception { + doEncryptionTest(8, 1, 2); + } + + @Test + public void testMultipleReducers() throws Exception { + doEncryptionTest(2, 4, 2); + } + + public void doEncryptionTest(int numMappers, int numReducers, int numNodes) throws Exception { + doEncryptionTest(numMappers, numReducers, numNodes, 1000); + } + + public void doEncryptionTest(int numMappers, int numReducers, int numNodes, int numLines) throws Exception { + MiniDFSCluster dfsCluster = null; + MiniMRClientCluster mrCluster = null; + FileSystem fileSystem = null; + try { + Configuration conf = new Configuration(); + // Start the mini-MR and mini-DFS clusters + + dfsCluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numNodes).build(); + fileSystem = dfsCluster.getFileSystem(); + mrCluster = MiniMRClientClusterFactory.create(this.getClass(), + numNodes, conf); + // Generate input. + createInput(fileSystem, numMappers, numLines); + // Run the test. + runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem, numMappers, numReducers, numLines); + } finally { + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + if (mrCluster != null) { + mrCluster.stop(); + } + } + } + + private void createInput(FileSystem fs, int numMappers, int numLines) throws Exception { + fs.delete(INPUT_DIR, true); + for (int i = 0; i < numMappers; i++) { + OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt")); + Writer writer = new OutputStreamWriter(os); + for (int j = 0; j < numLines; j++) { + // Create sorted key, value pairs. + int k = j + 1; + String formattedNumber = String.format("%09d", k); + writer.write(formattedNumber + " " + formattedNumber + "\n"); + } + writer.close(); + } + } + + private void runMergeTest(JobConf job, FileSystem fileSystem, int numMappers, int numReducers, int numLines) + throws Exception { + fileSystem.delete(OUTPUT, true); + job.setJobName("Test"); + JobClient client = new JobClient(job); + RunningJob submittedJob = null; + FileInputFormat.setInputPaths(job, INPUT_DIR); + FileOutputFormat.setOutputPath(job, OUTPUT); + job.set("mapreduce.output.textoutputformat.separator", " "); + job.setInputFormat(TextInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setMapperClass(MyMapper.class); + job.setPartitionerClass(MyPartitioner.class); + job.setOutputFormat(TextOutputFormat.class); + job.setNumReduceTasks(numReducers); + + job.setInt("mapreduce.map.maxattempts", 1); + job.setInt("mapreduce.reduce.maxattempts", 1); + job.setInt("mapred.test.num_lines", numLines); + job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + try { + submittedJob = client.submitJob(job); + try { + if (! client.monitorAndPrintJob(job, submittedJob)) { + throw new IOException("Job failed!"); + } + } catch(InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } catch(IOException ioe) { + System.err.println("Job failed with: " + ioe); + } finally { + verifyOutput(submittedJob, fileSystem, numMappers, numLines); + } + } + + private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int numMappers, int numLines) + throws Exception { + FSDataInputStream dis = null; + long numValidRecords = 0; + long numInvalidRecords = 0; + String prevKeyValue = "000000000"; + Path[] fileList = + FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT, + new Utils.OutputFileUtils.OutputFilesFilter())); + for (Path outFile : fileList) { + try { + dis = fileSystem.open(outFile); + String record; + while((record = dis.readLine()) != null) { + // Split the line into key and value. + int blankPos = record.indexOf(" "); + String keyString = record.substring(0, blankPos); + String valueString = record.substring(blankPos+1); + // Check for sorted output and correctness of record. + if (keyString.compareTo(prevKeyValue) >= 0 + && keyString.equals(valueString)) { + prevKeyValue = keyString; + numValidRecords++; + } else { + numInvalidRecords++; + } + } + } finally { + if (dis != null) { + dis.close(); + dis = null; + } + } + } + // Make sure we got all input records in the output in sorted order. + assertEquals((long)(numMappers * numLines), numValidRecords); + // Make sure there is no extraneous invalid record. + assertEquals(0, numInvalidRecords); + } + + /** + * A mapper implementation that assumes that key text contains valid integers + * in displayable form. + */ + public static class MyMapper extends MapReduceBase + implements Mapper { + private Text keyText; + private Text valueText; + + public MyMapper() { + keyText = new Text(); + valueText = new Text(); + } + + @Override + public void map(LongWritable key, Text value, + OutputCollector output, + Reporter reporter) throws IOException { + String record = value.toString(); + int blankPos = record.indexOf(" "); + keyText.set(record.substring(0, blankPos)); + valueText.set(record.substring(blankPos+1)); + output.collect(keyText, valueText); + } + + public void close() throws IOException { + } + } + + /** + * Partitioner implementation to make sure that output is in total sorted + * order. We basically route key ranges to different reducers such that + * key values monotonically increase with the partition number. For example, + * in this test, the keys are numbers from 1 to 1000 in the form "000000001" + * to "000001000" in each input file. The keys "000000001" to "000000250" are + * routed to partition 0, "000000251" to "000000500" are routed to partition 1 + * and so on since we have 4 reducers. + */ + static class MyPartitioner implements Partitioner { + + private JobConf job; + + public MyPartitioner() { + } + + public void configure(JobConf job) { + this.job = job; + } + + public int getPartition(Text key, Text value, int numPartitions) { + int keyValue = 0; + try { + keyValue = Integer.parseInt(key.toString()); + } catch(NumberFormatException nfe) { + keyValue = 0; + } + int partitionNumber = (numPartitions*(Math.max(0, keyValue-1)))/job.getInt("mapred.test.num_lines", 10000); + return partitionNumber; + } + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java index d3a084449a..43fd94871a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java @@ -80,7 +80,7 @@ public void runValueIterator(Path tmpDir, Pair[] vals, FileSystem rfs = ((LocalFileSystem)localFs).getRaw(); Path path = new Path(tmpDir, "data.in"); IFile.Writer writer = - new IFile.Writer(conf, rfs, path, Text.class, Text.class, + new IFile.Writer(conf, rfs.create(path), Text.class, Text.class, codec, null); for(Pair p: vals) { writer.append(new Text(p.key), new Text(p.value)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java index 69994f36bd..f447ebcc7c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java @@ -95,9 +95,9 @@ public void testRunner() throws Exception { new Counters.Counter(), new Progress()); FileSystem fs = new RawLocalFileSystem(); fs.setConf(conf); - Writer wr = new Writer(conf, fs, - new Path(workSpace + File.separator + "outfile"), IntWritable.class, - Text.class, null, null); + Writer wr = new Writer(conf, fs.create( + new Path(workSpace + File.separator + "outfile")), IntWritable.class, + Text.class, null, null, true); output.setWriter(wr); // stub for client File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationRunnableStub"); @@ -177,9 +177,9 @@ public void testApplication() throws Throwable { new Progress()); FileSystem fs = new RawLocalFileSystem(); fs.setConf(conf); - Writer wr = new Writer(conf, fs, - new Path(workSpace.getAbsolutePath() + File.separator + "outfile"), - IntWritable.class, Text.class, null, null); + Writer wr = new Writer(conf, fs.create( + new Path(workSpace.getAbsolutePath() + File.separator + "outfile")), + IntWritable.class, Text.class, null, null, true); output.setWriter(wr); conf.set(Submitter.PRESERVE_COMMANDFILE, "true");