diff --git a/hadoop-mapreduce-project/CHANGES-fs-encryption.txt b/hadoop-mapreduce-project/CHANGES-fs-encryption.txt
new file mode 100644
index 0000000000..83fdc1ef80
--- /dev/null
+++ b/hadoop-mapreduce-project/CHANGES-fs-encryption.txt
@@ -0,0 +1,15 @@
+Hadoop MapReduce Change Log
+
+fs-encryption (Unreleased)
+
+ INCOMPATIBLE CHANGES
+
+ NEW FEATURES
+
+ MAPREDUCE-5890. Support for encrypting Intermediate
+ data and spills in local filesystem. (asuresh via tucu)
+
+ IMPROVEMENTS
+
+ BUG FIXES
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
index cfcf0f2c6c..be7fe181f9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.CryptoUtils;
/**
* BackupStore
is an utility class that is used to support
@@ -572,7 +574,9 @@ private Writer createSpillFile() throws IOException {
file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(),
-1, conf);
- return new Writer(conf, fs, file);
+ FSDataOutputStream out = fs.create(file);
+ out = CryptoUtils.wrapIfNecessary(conf, out);
+ return new Writer(conf, out, null, null, null, null, true);
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
index a410c97557..30ebd6b8ca 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
@@ -90,13 +90,11 @@ public static class Writer {
DataOutputBuffer buffer = new DataOutputBuffer();
- public Writer(Configuration conf, FileSystem fs, Path file,
- Class keyClass, Class valueClass,
- CompressionCodec codec,
- Counters.Counter writesCounter) throws IOException {
- this(conf, fs.create(file), keyClass, valueClass, codec,
- writesCounter);
- ownOutputStream = true;
+ public Writer(Configuration conf, FSDataOutputStream out,
+ Class keyClass, Class valueClass,
+ CompressionCodec codec, Counters.Counter writesCounter)
+ throws IOException {
+ this(conf, out, keyClass, valueClass, codec, writesCounter, false);
}
protected Writer(Counters.Counter writesCounter) {
@@ -105,7 +103,8 @@ protected Writer(Counters.Counter writesCounter) {
public Writer(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valueClass,
- CompressionCodec codec, Counters.Counter writesCounter)
+ CompressionCodec codec, Counters.Counter writesCounter,
+ boolean ownOutputStream)
throws IOException {
this.writtenRecordsCounter = writesCounter;
this.checksumOut = new IFileOutputStream(out);
@@ -137,11 +136,7 @@ public Writer(Configuration conf, FSDataOutputStream out,
this.valueSerializer = serializationFactory.getSerializer(valueClass);
this.valueSerializer.open(buffer);
}
- }
-
- public Writer(Configuration conf, FileSystem fs, Path file)
- throws IOException {
- this(conf, fs, file, null, null, null, null);
+ this.ownOutputStream = ownOutputStream;
}
public void close() throws IOException {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
index 84fdd92cc5..b533ebe8e4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
@@ -66,6 +66,7 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
@@ -1580,7 +1581,8 @@ private void sortAndSpill() throws IOException, ClassNotFoundException,
IFile.Writer writer = null;
try {
long segmentStart = out.getPos();
- writer = new Writer(job, out, keyClass, valClass, codec,
+ FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
+ writer = new Writer(job, partitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null) {
// spill directly
@@ -1617,8 +1619,8 @@ private void sortAndSpill() throws IOException, ClassNotFoundException,
// record offsets
rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
+ rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
+ rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
spillRec.putIndex(rec, i);
writer = null;
@@ -1668,7 +1670,8 @@ private void spillSingleRecord(final K key, final V value,
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
- writer = new IFile.Writer(job, out, keyClass, valClass, codec,
+ FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
+ writer = new IFile.Writer(job, partitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (i == partition) {
@@ -1682,8 +1685,8 @@ private void spillSingleRecord(final K key, final V value,
// record offsets
rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
+ rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
+ rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
spillRec.putIndex(rec, i);
writer = null;
@@ -1825,12 +1828,13 @@ private void mergeParts() throws IOException, InterruptedException,
try {
for (int i = 0; i < partitions; i++) {
long segmentStart = finalOut.getPos();
+ FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
Writer writer =
- new Writer(job, finalOut, keyClass, valClass, codec, null);
+ new Writer(job, finalPartitionOut, keyClass, valClass, codec, null);
writer.close();
rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
+ rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
+ rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
sr.putIndex(rec, i);
}
sr.writeToFile(finalIndexFile, job);
@@ -1879,8 +1883,9 @@ private void mergeParts() throws IOException, InterruptedException,
//write merged output to disk
long segmentStart = finalOut.getPos();
+ FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
Writer writer =
- new Writer(job, finalOut, keyClass, valClass, codec,
+ new Writer(job, finalPartitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
@@ -1896,8 +1901,8 @@ private void mergeParts() throws IOException, InterruptedException,
// record offsets
rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
+ rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
+ rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
spillRec.putIndex(rec, parts);
}
spillRec.writeToFile(finalIndexFile, job);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
index 9493871138..92855169c8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@@ -40,6 +41,7 @@
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.util.PriorityQueue;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
@@ -298,8 +300,12 @@ public Segment(Reader reader, boolean preserve,
void init(Counters.Counter readsCounter) throws IOException {
if (reader == null) {
FSDataInputStream in = fs.open(file);
+
in.seek(segmentOffset);
- reader = new Reader(conf, in, segmentLength, codec, readsCounter);
+ in = CryptoUtils.wrapIfNecessary(conf, in);
+ reader = new Reader(conf, in,
+ segmentLength - CryptoUtils.cryptoPadding(conf),
+ codec, readsCounter);
}
if (mapOutputsCounter != null) {
@@ -714,9 +720,10 @@ RawKeyValueIterator merge(Class keyClass, Class valueClass,
tmpFilename.toString(),
approxOutputSize, conf);
- Writer writer =
- new Writer(conf, fs, outputFile, keyClass, valueClass, codec,
- writesCounter);
+ FSDataOutputStream out = fs.create(outputFile);
+ out = CryptoUtils.wrapIfNecessary(conf, out);
+ Writer writer = new Writer(conf, out, keyClass, valueClass,
+ codec, writesCounter, true);
writeFile(this, writer, reporter, conf);
writer.close();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
new file mode 100644
index 0000000000..7d8a4962c6
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.CryptoInputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.crypto.CryptoFSDataInputStream;
+import org.apache.hadoop.fs.crypto.CryptoFSDataOutputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.io.LimitInputStream;
+
+/**
+ * This class provides utilities to make it easier to work with Cryptographic
+ * Streams. Specifically for dealing with encrypting intermediate data such
+ * MapReduce spill files.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class CryptoUtils {
+
+ private static final Log LOG = LogFactory.getLog(CryptoUtils.class);
+
+ public static boolean isShuffleEncrypted(Configuration conf) {
+ return conf.getBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA,
+ MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA);
+ }
+
+ /**
+ * This method creates and initializes an IV (Initialization Vector)
+ *
+ * @param conf
+ * @return byte[]
+ * @throws IOException
+ */
+ public static byte[] createIV(Configuration conf) throws IOException {
+ CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
+ if (isShuffleEncrypted(conf)) {
+ byte[] iv = new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()];
+ cryptoCodec.generateSecureRandom(iv);
+ return iv;
+ } else {
+ return null;
+ }
+ }
+
+ public static int cryptoPadding(Configuration conf) {
+ // Sizeof(IV) + long(start-offset)
+ return isShuffleEncrypted(conf) ? CryptoCodec.getInstance(conf)
+ .getCipherSuite().getAlgorithmBlockSize() + 8 : 0;
+ }
+
+ private static byte[] getEncryptionKey() throws IOException {
+ return TokenCache.getShuffleSecretKey(UserGroupInformation.getCurrentUser()
+ .getCredentials());
+ }
+
+ private static int getBufferSize(Configuration conf) {
+ return conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB,
+ MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB) * 1024;
+ }
+
+ /**
+ * Wraps a given FSDataOutputStream with a CryptoOutputStream. The size of the
+ * data buffer required for the stream is specified by the
+ * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
+ * variable.
+ *
+ * @param conf
+ * @param out
+ * @return FSDataOutputStream
+ * @throws IOException
+ */
+ public static FSDataOutputStream wrapIfNecessary(Configuration conf,
+ FSDataOutputStream out) throws IOException {
+ if (isShuffleEncrypted(conf)) {
+ out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array());
+ byte[] iv = createIV(conf);
+ out.write(iv);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("IV written to Stream ["
+ + Base64.encodeBase64URLSafeString(iv) + "]");
+ }
+ return new CryptoFSDataOutputStream(out, CryptoCodec.getInstance(conf),
+ getBufferSize(conf), getEncryptionKey(), iv);
+ } else {
+ return out;
+ }
+ }
+
+ /**
+ * Wraps a given InputStream with a CryptoInputStream. The size of the data
+ * buffer required for the stream is specified by the
+ * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
+ * variable.
+ *
+ * If the value of 'length' is > -1, The InputStream is additionally wrapped
+ * in a LimitInputStream. CryptoStreams are late buffering in nature. This
+ * means they will always try to read ahead if they can. The LimitInputStream
+ * will ensure that the CryptoStream does not read past the provided length
+ * from the given Input Stream.
+ *
+ * @param conf
+ * @param in
+ * @param length
+ * @return InputStream
+ * @throws IOException
+ */
+ public static InputStream wrapIfNecessary(Configuration conf, InputStream in,
+ long length) throws IOException {
+ if (isShuffleEncrypted(conf)) {
+ int bufferSize = getBufferSize(conf);
+ if (length > -1) {
+ in = new LimitInputStream(in, length);
+ }
+ byte[] offsetArray = new byte[8];
+ IOUtils.readFully(in, offsetArray, 0, 8);
+ long offset = ByteBuffer.wrap(offsetArray).getLong();
+ CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
+ byte[] iv =
+ new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()];
+ IOUtils.readFully(in, iv, 0,
+ cryptoCodec.getCipherSuite().getAlgorithmBlockSize());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("IV read from ["
+ + Base64.encodeBase64URLSafeString(iv) + "]");
+ }
+ return new CryptoInputStream(in, cryptoCodec, bufferSize,
+ getEncryptionKey(), iv, offset + cryptoPadding(conf));
+ } else {
+ return in;
+ }
+ }
+
+ /**
+ * Wraps a given FSDataInputStream with a CryptoInputStream. The size of the
+ * data buffer required for the stream is specified by the
+ * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
+ * variable.
+ *
+ * @param conf
+ * @param in
+ * @return FSDataInputStream
+ * @throws IOException
+ */
+ public static FSDataInputStream wrapIfNecessary(Configuration conf,
+ FSDataInputStream in) throws IOException {
+ if (isShuffleEncrypted(conf)) {
+ CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
+ int bufferSize = getBufferSize(conf);
+ // Not going to be used... but still has to be read...
+ // Since the O/P stream always writes it..
+ IOUtils.readFully(in, new byte[8], 0, 8);
+ byte[] iv =
+ new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()];
+ IOUtils.readFully(in, iv, 0,
+ cryptoCodec.getCipherSuite().getAlgorithmBlockSize());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("IV read from Stream ["
+ + Base64.encodeBase64URLSafeString(iv) + "]");
+ }
+ return new CryptoFSDataInputStream(in, cryptoCodec, bufferSize,
+ getEncryptionKey(), iv);
+ } else {
+ return in;
+ }
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index 94e7125749..0734e7f295 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -291,7 +291,7 @@ private void copyJar(Path originalJarPath, Path submitJarFile,
/**
* configure the jobconf of the user with the command line options of
* -libjars, -files, -archives.
- * @param conf
+ * @param job
* @throws IOException
*/
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
@@ -376,8 +376,13 @@ JobStatus submitJobInternal(Job job, Cluster cluster)
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
+
+ int keyLen = CryptoUtils.isShuffleEncrypted(conf)
+ ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
+ MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
+ : SHUFFLE_KEY_LENGTH;
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
- keyGen.init(SHUFFLE_KEY_LENGTH);
+ keyGen.init(keyLen);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 4795af78d2..e4ded57a49 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -762,4 +762,18 @@ public interface MRJobConfig {
public static final String TASK_PREEMPTION =
"mapreduce.job.preemption";
+
+ public static final String MR_ENCRYPTED_INTERMEDIATE_DATA =
+ "mapreduce.job.encrypted-intermediate-data";
+ public static final boolean DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA = false;
+
+ public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS =
+ "mapreduce.job.encrypted-intermediate-data-key-size-bits";
+ public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS =
+ 128;
+
+ public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
+ "mapreduce.job.encrypted-intermediate-data.buffer.kb";
+ public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
+ 128;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 00d4764e66..20db9dc7e5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -19,6 +19,7 @@
import java.io.DataInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
@@ -43,6 +44,7 @@
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.security.ssl.SSLFactory;
import com.google.common.annotations.VisibleForTesting;
@@ -65,6 +67,7 @@ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
CONNECTION, WRONG_REDUCE}
private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+ private final JobConf jobConf;
private final Counters.Counter connectionErrs;
private final Counters.Counter ioErrs;
private final Counters.Counter wrongLengthErrs;
@@ -104,6 +107,7 @@ public Fetcher(JobConf job, TaskAttemptID reduceId,
Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter, SecretKey shuffleKey,
int id) {
+ this.jobConf = job;
this.reporter = reporter;
this.scheduler = scheduler;
this.merger = merger;
@@ -394,7 +398,11 @@ private TaskAttemptID[] copyMapOutput(MapHost host,
return remaining.toArray(new TaskAttemptID[remaining.size()]);
}
-
+ InputStream is = input;
+ is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
+ compressedLength -= CryptoUtils.cryptoPadding(jobConf);
+ decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
+
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
@@ -431,7 +439,7 @@ private TaskAttemptID[] copyMapOutput(MapHost host,
LOG.info("fetcher#" + id + " about to shuffle output of map "
+ mapOutput.getMapId() + " decomp: " + decompressedLength
+ " len: " + compressedLength + " to " + mapOutput.getDescription());
- mapOutput.shuffle(host, input, compressedLength, decompressedLength,
+ mapOutput.shuffle(host, is, compressedLength, decompressedLength,
metrics, reporter);
} catch (java.lang.InternalError e) {
LOG.warn("Failed to shuffle for fetcher#"+id, e);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
index 52796524da..98256c2d65 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SpillRecord;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.CryptoUtils;
/**
* LocalFetcher is used by LocalJobRunner to perform a local filesystem
@@ -145,6 +146,9 @@ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
// now read the file, seek to the appropriate section, and send it.
FileSystem localFs = FileSystem.getLocal(job).getRaw();
FSDataInputStream inStream = localFs.open(mapOutputFileName);
+
+ inStream = CryptoUtils.wrapIfNecessary(job, inStream);
+
try {
inStream.seek(ir.startOffset);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
index a821e4d1b8..1fa1da0f70 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
@@ -54,6 +55,7 @@
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
@@ -227,6 +229,10 @@ protected MergeThread, K,V> createInMemoryMerger() {
return new InMemoryMerger(this);
}
+ protected MergeThread createOnDiskMerger() {
+ return new OnDiskMerger(this);
+ }
+
TaskAttemptID getReduceId() {
return reduceId;
}
@@ -452,11 +458,10 @@ public void merge(List> inputs) throws IOException {
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
- Writer writer =
- new Writer(jobConf, rfs, outputPath,
- (Class) jobConf.getMapOutputKeyClass(),
- (Class) jobConf.getMapOutputValueClass(),
- codec, null);
+ FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
+ Writer writer = new Writer(jobConf, out,
+ (Class) jobConf.getMapOutputKeyClass(),
+ (Class) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
@@ -536,11 +541,12 @@ public void merge(List inputs) throws IOException {
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
- Writer writer =
- new Writer(jobConf, rfs, outputPath,
- (Class) jobConf.getMapOutputKeyClass(),
- (Class) jobConf.getMapOutputValueClass(),
- codec, null);
+
+ FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
+ Writer writer = new Writer(jobConf, out,
+ (Class) jobConf.getMapOutputKeyClass(),
+ (Class) jobConf.getMapOutputValueClass(), codec, null, true);
+
RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
@@ -716,8 +722,10 @@ private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null,
mergePhase);
- Writer writer = new Writer(job, fs, outputPath,
- keyClass, valueClass, codec, null);
+
+ FSDataOutputStream out = CryptoUtils.wrapIfNecessary(job, fs.create(outputPath));
+ Writer writer = new Writer(job, out, keyClass, valueClass,
+ codec, null, true);
try {
Merger.writeFile(rIter, writer, reporter, job);
writer.close();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
index 59bb04a9de..6e0e92bd4d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
import com.google.common.annotations.VisibleForTesting;
@@ -75,7 +76,7 @@ public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
this.merger = merger;
this.outputPath = outputPath;
tmpOutputPath = getTempPath(outputPath, fetcher);
- disk = fs.create(tmpOutputPath);
+ disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
}
@VisibleForTesting
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
index 1aea5004a5..c5ab420b81 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
@@ -24,14 +24,16 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.junit.Assert;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -51,10 +53,16 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
+import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.junit.After;
@@ -63,40 +71,48 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import com.google.common.collect.Lists;
+
public class TestMerger {
private Configuration conf;
private JobConf jobConf;
private FileSystem fs;
-
+
@Before
public void setup() throws IOException {
conf = new Configuration();
jobConf = new JobConf();
fs = FileSystem.getLocal(conf);
}
-
- @After
- public void cleanup() throws IOException {
- fs.delete(new Path(jobConf.getLocalDirs()[0]), true);
- }
-
+
+
@Test
- public void testInMemoryMerger() throws Throwable {
+ public void testEncryptedMerger() throws Throwable {
+ jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+ conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+ Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+ TokenCache.setShuffleSecretKey(new byte[16], credentials);
+ UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ testInMemoryAndOnDiskMerger();
+ }
+
+ @Test
+ public void testInMemoryAndOnDiskMerger() throws Throwable {
JobID jobId = new JobID("a", 0);
- TaskAttemptID reduceId = new TaskAttemptID(
+ TaskAttemptID reduceId1 = new TaskAttemptID(
new TaskID(jobId, TaskType.REDUCE, 0), 0);
TaskAttemptID mapId1 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 1), 0);
TaskAttemptID mapId2 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 2), 0);
-
+
LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
-
+
MergeManagerImpl mergeManager = new MergeManagerImpl(
- reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
+ reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
null, null, new Progress(), new MROutputFiles());
-
+
// write map outputs
Map map1 = new TreeMap();
map1.put("apple", "disgusting");
@@ -113,32 +129,88 @@ public void testInMemoryMerger() throws Throwable {
mapOutputBytes1.length);
System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
mapOutputBytes2.length);
-
+
// create merger and run merge
MergeThread, Text, Text> inMemoryMerger =
mergeManager.createInMemoryMerger();
- List> mapOutputs =
+ List> mapOutputs1 =
new ArrayList>();
- mapOutputs.add(mapOutput1);
- mapOutputs.add(mapOutput2);
-
- inMemoryMerger.merge(mapOutputs);
-
+ mapOutputs1.add(mapOutput1);
+ mapOutputs1.add(mapOutput2);
+
+ inMemoryMerger.merge(mapOutputs1);
+
Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
- Path outPath = mergeManager.onDiskMapOutputs.iterator().next();
-
+
+ TaskAttemptID reduceId2 = new TaskAttemptID(
+ new TaskID(jobId, TaskType.REDUCE, 3), 0);
+ TaskAttemptID mapId3 = new TaskAttemptID(
+ new TaskID(jobId, TaskType.MAP, 4), 0);
+ TaskAttemptID mapId4 = new TaskAttemptID(
+ new TaskID(jobId, TaskType.MAP, 5), 0);
+ // write map outputs
+ Map map3 = new TreeMap();
+ map3.put("apple", "awesome");
+ map3.put("carrot", "amazing");
+ Map map4 = new TreeMap();
+ map4.put("banana", "bla");
+ byte[] mapOutputBytes3 = writeMapOutput(conf, map3);
+ byte[] mapOutputBytes4 = writeMapOutput(conf, map4);
+ InMemoryMapOutput mapOutput3 = new InMemoryMapOutput(
+ conf, mapId3, mergeManager, mapOutputBytes3.length, null, true);
+ InMemoryMapOutput mapOutput4 = new InMemoryMapOutput(
+ conf, mapId4, mergeManager, mapOutputBytes4.length, null, true);
+ System.arraycopy(mapOutputBytes3, 0, mapOutput3.getMemory(), 0,
+ mapOutputBytes3.length);
+ System.arraycopy(mapOutputBytes4, 0, mapOutput4.getMemory(), 0,
+ mapOutputBytes4.length);
+
+// // create merger and run merge
+ MergeThread, Text, Text> inMemoryMerger2 =
+ mergeManager.createInMemoryMerger();
+ List> mapOutputs2 =
+ new ArrayList>();
+ mapOutputs2.add(mapOutput3);
+ mapOutputs2.add(mapOutput4);
+
+ inMemoryMerger2.merge(mapOutputs2);
+
+ Assert.assertEquals(2, mergeManager.onDiskMapOutputs.size());
+
+ List paths = new ArrayList();
+ Iterator iterator = mergeManager.onDiskMapOutputs.iterator();
List keys = new ArrayList();
List values = new ArrayList();
- readOnDiskMapOutput(conf, fs, outPath, keys, values);
- Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
- Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
+ while (iterator.hasNext()) {
+ CompressAwarePath next = iterator.next();
+ readOnDiskMapOutput(conf, fs, next, keys, values);
+ paths.add(next);
+ }
+ Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot", "apple", "banana", "carrot"));
+ Assert.assertEquals(values, Arrays.asList("awesome", "bla", "amazing", "disgusting", "pretty good", "delicious"));
+ mergeManager.close();
+
+ mergeManager = new MergeManagerImpl(
+ reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
+ null, null, new Progress(), new MROutputFiles());
+
+ MergeThread onDiskMerger = mergeManager.createOnDiskMerger();
+ onDiskMerger.merge(paths);
+
+ Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
+
+ keys = new ArrayList();
+ values = new ArrayList();
+ readOnDiskMapOutput(conf, fs, mergeManager.onDiskMapOutputs.iterator().next(), keys, values);
+ Assert.assertEquals(keys, Arrays.asList("apple", "apple", "banana", "banana", "carrot", "carrot"));
+ Assert.assertEquals(values, Arrays.asList("awesome", "disgusting", "pretty good", "bla", "amazing", "delicious"));
mergeManager.close();
Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size());
}
-
+
private byte[] writeMapOutput(Configuration conf, Map keysToValues)
throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -152,11 +224,13 @@ private byte[] writeMapOutput(Configuration conf, Map keysToValu
writer.close();
return baos.toByteArray();
}
-
+
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
List keys, List values) throws IOException {
- IFile.Reader reader = new IFile.Reader(conf, fs,
- path, null, null);
+ FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
+
+ IFile.Reader reader = new IFile.Reader(conf, in,
+ fs.getFileStatus(path).getLen(), null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
@@ -169,17 +243,17 @@ private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
values.add(value.toString());
}
}
-
+
@Test
public void testCompressed() throws IOException {
testMergeShouldReturnProperProgress(getCompressedSegments());
- }
-
+}
+
@Test
public void testUncompressed() throws IOException {
testMergeShouldReturnProperProgress(getUncompressedSegments());
}
-
+
@SuppressWarnings( { "deprecation", "unchecked" })
public void testMergeShouldReturnProperProgress(
List> segments) throws IOException {
@@ -212,7 +286,7 @@ private List> getUncompressedSegments() throws IOException {
}
return segments;
}
-
+
private List> getCompressedSegments() throws IOException {
List> segments = new ArrayList>();
for (int i = 1; i < 1; i++) {
@@ -220,7 +294,7 @@ private List> getCompressedSegments() throws IOException {
}
return segments;
}
-
+
private Segment getUncompressedSegment(int i) throws IOException {
return new Segment(getReader(i), false);
}
@@ -258,7 +332,7 @@ public Boolean answer(InvocationOnMock invocation) {
}
};
}
-
+
private Answer> getValueAnswer(final String segmentName) {
return new Answer() {
int i = 0;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
index e3c7253afc..a314fc1f57 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@@ -42,7 +44,7 @@ public void testIFileWriterWithCodec() throws Exception {
DefaultCodec codec = new GzipCodec();
codec.setConf(conf);
IFile.Writer writer =
- new IFile.Writer(conf, rfs, path, Text.class, Text.class,
+ new IFile.Writer(conf, rfs.create(path), Text.class, Text.class,
codec, null);
writer.close();
}
@@ -56,12 +58,15 @@ public void testIFileReaderWithCodec() throws Exception {
Path path = new Path(new Path("build/test.ifile"), "data");
DefaultCodec codec = new GzipCodec();
codec.setConf(conf);
+ FSDataOutputStream out = rfs.create(path);
IFile.Writer writer =
- new IFile.Writer(conf, rfs, path, Text.class, Text.class,
+ new IFile.Writer(conf, out, Text.class, Text.class,
codec, null);
writer.close();
+ FSDataInputStream in = rfs.open(path);
IFile.Reader reader =
- new IFile.Reader(conf, rfs, path, codec, null);
+ new IFile.Reader(conf, in, rfs.getFileStatus(path).getLen(),
+ codec, null);
reader.close();
// test check sum
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
new file mode 100644
index 0000000000..ebc32adb9d
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings(value={"unchecked", "deprecation"})
+/**
+ * This test tests the support for a merge operation in Hadoop. The input files
+ * are already sorted on the key. This test implements an external
+ * MapOutputCollector implementation that just copies the records to different
+ * partitions while maintaining the sort order in each partition. The Hadoop
+ * framework's merge on the reduce side will merge the partitions created to
+ * generate the final output which is sorted on the key.
+ */
+public class TestMRIntermediateDataEncryption {
+ // Where MR job's input will reside.
+ private static final Path INPUT_DIR = new Path("/test/input");
+ // Where output goes.
+ private static final Path OUTPUT = new Path("/test/output");
+
+ @Test
+ public void testSingleReducer() throws Exception {
+ doEncryptionTest(3, 1, 2);
+ }
+
+ @Test
+ public void testMultipleMapsPerNode() throws Exception {
+ doEncryptionTest(8, 1, 2);
+ }
+
+ @Test
+ public void testMultipleReducers() throws Exception {
+ doEncryptionTest(2, 4, 2);
+ }
+
+ public void doEncryptionTest(int numMappers, int numReducers, int numNodes) throws Exception {
+ doEncryptionTest(numMappers, numReducers, numNodes, 1000);
+ }
+
+ public void doEncryptionTest(int numMappers, int numReducers, int numNodes, int numLines) throws Exception {
+ MiniDFSCluster dfsCluster = null;
+ MiniMRClientCluster mrCluster = null;
+ FileSystem fileSystem = null;
+ try {
+ Configuration conf = new Configuration();
+ // Start the mini-MR and mini-DFS clusters
+
+ dfsCluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numNodes).build();
+ fileSystem = dfsCluster.getFileSystem();
+ mrCluster = MiniMRClientClusterFactory.create(this.getClass(),
+ numNodes, conf);
+ // Generate input.
+ createInput(fileSystem, numMappers, numLines);
+ // Run the test.
+ runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem, numMappers, numReducers, numLines);
+ } finally {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ if (mrCluster != null) {
+ mrCluster.stop();
+ }
+ }
+ }
+
+ private void createInput(FileSystem fs, int numMappers, int numLines) throws Exception {
+ fs.delete(INPUT_DIR, true);
+ for (int i = 0; i < numMappers; i++) {
+ OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
+ Writer writer = new OutputStreamWriter(os);
+ for (int j = 0; j < numLines; j++) {
+ // Create sorted key, value pairs.
+ int k = j + 1;
+ String formattedNumber = String.format("%09d", k);
+ writer.write(formattedNumber + " " + formattedNumber + "\n");
+ }
+ writer.close();
+ }
+ }
+
+ private void runMergeTest(JobConf job, FileSystem fileSystem, int numMappers, int numReducers, int numLines)
+ throws Exception {
+ fileSystem.delete(OUTPUT, true);
+ job.setJobName("Test");
+ JobClient client = new JobClient(job);
+ RunningJob submittedJob = null;
+ FileInputFormat.setInputPaths(job, INPUT_DIR);
+ FileOutputFormat.setOutputPath(job, OUTPUT);
+ job.set("mapreduce.output.textoutputformat.separator", " ");
+ job.setInputFormat(TextInputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setMapperClass(MyMapper.class);
+ job.setPartitionerClass(MyPartitioner.class);
+ job.setOutputFormat(TextOutputFormat.class);
+ job.setNumReduceTasks(numReducers);
+
+ job.setInt("mapreduce.map.maxattempts", 1);
+ job.setInt("mapreduce.reduce.maxattempts", 1);
+ job.setInt("mapred.test.num_lines", numLines);
+ job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+ try {
+ submittedJob = client.submitJob(job);
+ try {
+ if (! client.monitorAndPrintJob(job, submittedJob)) {
+ throw new IOException("Job failed!");
+ }
+ } catch(InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ } catch(IOException ioe) {
+ System.err.println("Job failed with: " + ioe);
+ } finally {
+ verifyOutput(submittedJob, fileSystem, numMappers, numLines);
+ }
+ }
+
+ private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int numMappers, int numLines)
+ throws Exception {
+ FSDataInputStream dis = null;
+ long numValidRecords = 0;
+ long numInvalidRecords = 0;
+ String prevKeyValue = "000000000";
+ Path[] fileList =
+ FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
+ new Utils.OutputFileUtils.OutputFilesFilter()));
+ for (Path outFile : fileList) {
+ try {
+ dis = fileSystem.open(outFile);
+ String record;
+ while((record = dis.readLine()) != null) {
+ // Split the line into key and value.
+ int blankPos = record.indexOf(" ");
+ String keyString = record.substring(0, blankPos);
+ String valueString = record.substring(blankPos+1);
+ // Check for sorted output and correctness of record.
+ if (keyString.compareTo(prevKeyValue) >= 0
+ && keyString.equals(valueString)) {
+ prevKeyValue = keyString;
+ numValidRecords++;
+ } else {
+ numInvalidRecords++;
+ }
+ }
+ } finally {
+ if (dis != null) {
+ dis.close();
+ dis = null;
+ }
+ }
+ }
+ // Make sure we got all input records in the output in sorted order.
+ assertEquals((long)(numMappers * numLines), numValidRecords);
+ // Make sure there is no extraneous invalid record.
+ assertEquals(0, numInvalidRecords);
+ }
+
+ /**
+ * A mapper implementation that assumes that key text contains valid integers
+ * in displayable form.
+ */
+ public static class MyMapper extends MapReduceBase
+ implements Mapper {
+ private Text keyText;
+ private Text valueText;
+
+ public MyMapper() {
+ keyText = new Text();
+ valueText = new Text();
+ }
+
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ String record = value.toString();
+ int blankPos = record.indexOf(" ");
+ keyText.set(record.substring(0, blankPos));
+ valueText.set(record.substring(blankPos+1));
+ output.collect(keyText, valueText);
+ }
+
+ public void close() throws IOException {
+ }
+ }
+
+ /**
+ * Partitioner implementation to make sure that output is in total sorted
+ * order. We basically route key ranges to different reducers such that
+ * key values monotonically increase with the partition number. For example,
+ * in this test, the keys are numbers from 1 to 1000 in the form "000000001"
+ * to "000001000" in each input file. The keys "000000001" to "000000250" are
+ * routed to partition 0, "000000251" to "000000500" are routed to partition 1
+ * and so on since we have 4 reducers.
+ */
+ static class MyPartitioner implements Partitioner {
+
+ private JobConf job;
+
+ public MyPartitioner() {
+ }
+
+ public void configure(JobConf job) {
+ this.job = job;
+ }
+
+ public int getPartition(Text key, Text value, int numPartitions) {
+ int keyValue = 0;
+ try {
+ keyValue = Integer.parseInt(key.toString());
+ } catch(NumberFormatException nfe) {
+ keyValue = 0;
+ }
+ int partitionNumber = (numPartitions*(Math.max(0, keyValue-1)))/job.getInt("mapred.test.num_lines", 10000);
+ return partitionNumber;
+ }
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java
index d3a084449a..43fd94871a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java
@@ -80,7 +80,7 @@ public void runValueIterator(Path tmpDir, Pair[] vals,
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
Path path = new Path(tmpDir, "data.in");
IFile.Writer writer =
- new IFile.Writer(conf, rfs, path, Text.class, Text.class,
+ new IFile.Writer(conf, rfs.create(path), Text.class, Text.class,
codec, null);
for(Pair p: vals) {
writer.append(new Text(p.key), new Text(p.value));
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
index 69994f36bd..f447ebcc7c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
@@ -95,9 +95,9 @@ public void testRunner() throws Exception {
new Counters.Counter(), new Progress());
FileSystem fs = new RawLocalFileSystem();
fs.setConf(conf);
- Writer wr = new Writer(conf, fs,
- new Path(workSpace + File.separator + "outfile"), IntWritable.class,
- Text.class, null, null);
+ Writer wr = new Writer(conf, fs.create(
+ new Path(workSpace + File.separator + "outfile")), IntWritable.class,
+ Text.class, null, null, true);
output.setWriter(wr);
// stub for client
File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationRunnableStub");
@@ -177,9 +177,9 @@ public void testApplication() throws Throwable {
new Progress());
FileSystem fs = new RawLocalFileSystem();
fs.setConf(conf);
- Writer wr = new Writer(conf, fs,
- new Path(workSpace.getAbsolutePath() + File.separator + "outfile"),
- IntWritable.class, Text.class, null, null);
+ Writer wr = new Writer(conf, fs.create(
+ new Path(workSpace.getAbsolutePath() + File.separator + "outfile")),
+ IntWritable.class, Text.class, null, null, true);
output.setWriter(wr);
conf.set(Submitter.PRESERVE_COMMANDFILE, "true");