MAPREDUCE-5890. Support for encrypting Intermediate data and spills in local filesystem. (asuresh via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1609597 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c30872a4ee
commit
95986dd2fb
15
hadoop-mapreduce-project/CHANGES-fs-encryption.txt
Normal file
15
hadoop-mapreduce-project/CHANGES-fs-encryption.txt
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
Hadoop MapReduce Change Log
|
||||||
|
|
||||||
|
fs-encryption (Unreleased)
|
||||||
|
|
||||||
|
INCOMPATIBLE CHANGES
|
||||||
|
|
||||||
|
NEW FEATURES
|
||||||
|
|
||||||
|
MAPREDUCE-5890. Support for encrypting Intermediate
|
||||||
|
data and spills in local filesystem. (asuresh via tucu)
|
||||||
|
|
||||||
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
BUG FIXES
|
||||||
|
|
@ -31,6 +31,7 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@ -43,6 +44,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <code>BackupStore</code> is an utility class that is used to support
|
* <code>BackupStore</code> is an utility class that is used to support
|
||||||
@ -572,7 +574,9 @@ private Writer<K,V> createSpillFile() throws IOException {
|
|||||||
|
|
||||||
file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(),
|
file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(),
|
||||||
-1, conf);
|
-1, conf);
|
||||||
return new Writer<K, V>(conf, fs, file);
|
FSDataOutputStream out = fs.create(file);
|
||||||
|
out = CryptoUtils.wrapIfNecessary(conf, out);
|
||||||
|
return new Writer<K, V>(conf, out, null, null, null, null, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,13 +90,11 @@ public static class Writer<K extends Object, V extends Object> {
|
|||||||
|
|
||||||
DataOutputBuffer buffer = new DataOutputBuffer();
|
DataOutputBuffer buffer = new DataOutputBuffer();
|
||||||
|
|
||||||
public Writer(Configuration conf, FileSystem fs, Path file,
|
public Writer(Configuration conf, FSDataOutputStream out,
|
||||||
Class<K> keyClass, Class<V> valueClass,
|
Class<K> keyClass, Class<V> valueClass,
|
||||||
CompressionCodec codec,
|
CompressionCodec codec, Counters.Counter writesCounter)
|
||||||
Counters.Counter writesCounter) throws IOException {
|
throws IOException {
|
||||||
this(conf, fs.create(file), keyClass, valueClass, codec,
|
this(conf, out, keyClass, valueClass, codec, writesCounter, false);
|
||||||
writesCounter);
|
|
||||||
ownOutputStream = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Writer(Counters.Counter writesCounter) {
|
protected Writer(Counters.Counter writesCounter) {
|
||||||
@ -105,7 +103,8 @@ protected Writer(Counters.Counter writesCounter) {
|
|||||||
|
|
||||||
public Writer(Configuration conf, FSDataOutputStream out,
|
public Writer(Configuration conf, FSDataOutputStream out,
|
||||||
Class<K> keyClass, Class<V> valueClass,
|
Class<K> keyClass, Class<V> valueClass,
|
||||||
CompressionCodec codec, Counters.Counter writesCounter)
|
CompressionCodec codec, Counters.Counter writesCounter,
|
||||||
|
boolean ownOutputStream)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.writtenRecordsCounter = writesCounter;
|
this.writtenRecordsCounter = writesCounter;
|
||||||
this.checksumOut = new IFileOutputStream(out);
|
this.checksumOut = new IFileOutputStream(out);
|
||||||
@ -137,11 +136,7 @@ public Writer(Configuration conf, FSDataOutputStream out,
|
|||||||
this.valueSerializer = serializationFactory.getSerializer(valueClass);
|
this.valueSerializer = serializationFactory.getSerializer(valueClass);
|
||||||
this.valueSerializer.open(buffer);
|
this.valueSerializer.open(buffer);
|
||||||
}
|
}
|
||||||
}
|
this.ownOutputStream = ownOutputStream;
|
||||||
|
|
||||||
public Writer(Configuration conf, FileSystem fs, Path file)
|
|
||||||
throws IOException {
|
|
||||||
this(conf, fs, file, null, null, null, null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
@ -66,6 +66,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
|
||||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
|
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
|
||||||
import org.apache.hadoop.mapreduce.task.MapContextImpl;
|
import org.apache.hadoop.mapreduce.task.MapContextImpl;
|
||||||
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
import org.apache.hadoop.util.IndexedSortable;
|
import org.apache.hadoop.util.IndexedSortable;
|
||||||
import org.apache.hadoop.util.IndexedSorter;
|
import org.apache.hadoop.util.IndexedSorter;
|
||||||
import org.apache.hadoop.util.Progress;
|
import org.apache.hadoop.util.Progress;
|
||||||
@ -1580,7 +1581,8 @@ private void sortAndSpill() throws IOException, ClassNotFoundException,
|
|||||||
IFile.Writer<K, V> writer = null;
|
IFile.Writer<K, V> writer = null;
|
||||||
try {
|
try {
|
||||||
long segmentStart = out.getPos();
|
long segmentStart = out.getPos();
|
||||||
writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
|
FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
|
||||||
|
writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
|
||||||
spilledRecordsCounter);
|
spilledRecordsCounter);
|
||||||
if (combinerRunner == null) {
|
if (combinerRunner == null) {
|
||||||
// spill directly
|
// spill directly
|
||||||
@ -1617,8 +1619,8 @@ private void sortAndSpill() throws IOException, ClassNotFoundException,
|
|||||||
|
|
||||||
// record offsets
|
// record offsets
|
||||||
rec.startOffset = segmentStart;
|
rec.startOffset = segmentStart;
|
||||||
rec.rawLength = writer.getRawLength();
|
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
|
||||||
rec.partLength = writer.getCompressedLength();
|
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
|
||||||
spillRec.putIndex(rec, i);
|
spillRec.putIndex(rec, i);
|
||||||
|
|
||||||
writer = null;
|
writer = null;
|
||||||
@ -1668,7 +1670,8 @@ private void spillSingleRecord(final K key, final V value,
|
|||||||
try {
|
try {
|
||||||
long segmentStart = out.getPos();
|
long segmentStart = out.getPos();
|
||||||
// Create a new codec, don't care!
|
// Create a new codec, don't care!
|
||||||
writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
|
FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
|
||||||
|
writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, valClass, codec,
|
||||||
spilledRecordsCounter);
|
spilledRecordsCounter);
|
||||||
|
|
||||||
if (i == partition) {
|
if (i == partition) {
|
||||||
@ -1682,8 +1685,8 @@ private void spillSingleRecord(final K key, final V value,
|
|||||||
|
|
||||||
// record offsets
|
// record offsets
|
||||||
rec.startOffset = segmentStart;
|
rec.startOffset = segmentStart;
|
||||||
rec.rawLength = writer.getRawLength();
|
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
|
||||||
rec.partLength = writer.getCompressedLength();
|
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
|
||||||
spillRec.putIndex(rec, i);
|
spillRec.putIndex(rec, i);
|
||||||
|
|
||||||
writer = null;
|
writer = null;
|
||||||
@ -1825,12 +1828,13 @@ private void mergeParts() throws IOException, InterruptedException,
|
|||||||
try {
|
try {
|
||||||
for (int i = 0; i < partitions; i++) {
|
for (int i = 0; i < partitions; i++) {
|
||||||
long segmentStart = finalOut.getPos();
|
long segmentStart = finalOut.getPos();
|
||||||
|
FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
|
||||||
Writer<K, V> writer =
|
Writer<K, V> writer =
|
||||||
new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
|
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null);
|
||||||
writer.close();
|
writer.close();
|
||||||
rec.startOffset = segmentStart;
|
rec.startOffset = segmentStart;
|
||||||
rec.rawLength = writer.getRawLength();
|
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
|
||||||
rec.partLength = writer.getCompressedLength();
|
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
|
||||||
sr.putIndex(rec, i);
|
sr.putIndex(rec, i);
|
||||||
}
|
}
|
||||||
sr.writeToFile(finalIndexFile, job);
|
sr.writeToFile(finalIndexFile, job);
|
||||||
@ -1879,8 +1883,9 @@ private void mergeParts() throws IOException, InterruptedException,
|
|||||||
|
|
||||||
//write merged output to disk
|
//write merged output to disk
|
||||||
long segmentStart = finalOut.getPos();
|
long segmentStart = finalOut.getPos();
|
||||||
|
FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
|
||||||
Writer<K, V> writer =
|
Writer<K, V> writer =
|
||||||
new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
|
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,
|
||||||
spilledRecordsCounter);
|
spilledRecordsCounter);
|
||||||
if (combinerRunner == null || numSpills < minSpillsForCombine) {
|
if (combinerRunner == null || numSpills < minSpillsForCombine) {
|
||||||
Merger.writeFile(kvIter, writer, reporter, job);
|
Merger.writeFile(kvIter, writer, reporter, job);
|
||||||
@ -1896,8 +1901,8 @@ private void mergeParts() throws IOException, InterruptedException,
|
|||||||
|
|
||||||
// record offsets
|
// record offsets
|
||||||
rec.startOffset = segmentStart;
|
rec.startOffset = segmentStart;
|
||||||
rec.rawLength = writer.getRawLength();
|
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
|
||||||
rec.partLength = writer.getCompressedLength();
|
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
|
||||||
spillRec.putIndex(rec, parts);
|
spillRec.putIndex(rec, parts);
|
||||||
}
|
}
|
||||||
spillRec.writeToFile(finalIndexFile, job);
|
spillRec.writeToFile(finalIndexFile, job);
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.ChecksumFileSystem;
|
import org.apache.hadoop.fs.ChecksumFileSystem;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@ -40,6 +41,7 @@
|
|||||||
import org.apache.hadoop.mapred.IFile.Writer;
|
import org.apache.hadoop.mapred.IFile.Writer;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
import org.apache.hadoop.util.PriorityQueue;
|
import org.apache.hadoop.util.PriorityQueue;
|
||||||
import org.apache.hadoop.util.Progress;
|
import org.apache.hadoop.util.Progress;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
@ -298,8 +300,12 @@ public Segment(Reader<K, V> reader, boolean preserve,
|
|||||||
void init(Counters.Counter readsCounter) throws IOException {
|
void init(Counters.Counter readsCounter) throws IOException {
|
||||||
if (reader == null) {
|
if (reader == null) {
|
||||||
FSDataInputStream in = fs.open(file);
|
FSDataInputStream in = fs.open(file);
|
||||||
|
|
||||||
in.seek(segmentOffset);
|
in.seek(segmentOffset);
|
||||||
reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter);
|
in = CryptoUtils.wrapIfNecessary(conf, in);
|
||||||
|
reader = new Reader<K, V>(conf, in,
|
||||||
|
segmentLength - CryptoUtils.cryptoPadding(conf),
|
||||||
|
codec, readsCounter);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mapOutputsCounter != null) {
|
if (mapOutputsCounter != null) {
|
||||||
@ -714,9 +720,10 @@ RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
|
|||||||
tmpFilename.toString(),
|
tmpFilename.toString(),
|
||||||
approxOutputSize, conf);
|
approxOutputSize, conf);
|
||||||
|
|
||||||
Writer<K, V> writer =
|
FSDataOutputStream out = fs.create(outputFile);
|
||||||
new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec,
|
out = CryptoUtils.wrapIfNecessary(conf, out);
|
||||||
writesCounter);
|
Writer<K, V> writer = new Writer<K, V>(conf, out, keyClass, valueClass,
|
||||||
|
codec, writesCounter, true);
|
||||||
writeFile(this, writer, reporter, conf);
|
writeFile(this, writer, reporter, conf);
|
||||||
writer.close();
|
writer.close();
|
||||||
|
|
||||||
|
@ -0,0 +1,199 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.binary.Base64;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.crypto.CryptoCodec;
|
||||||
|
import org.apache.hadoop.crypto.CryptoInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.crypto.CryptoFSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.crypto.CryptoFSDataOutputStream;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
|
import com.google.common.io.LimitInputStream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class provides utilities to make it easier to work with Cryptographic
|
||||||
|
* Streams. Specifically for dealing with encrypting intermediate data such
|
||||||
|
* MapReduce spill files.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class CryptoUtils {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(CryptoUtils.class);
|
||||||
|
|
||||||
|
public static boolean isShuffleEncrypted(Configuration conf) {
|
||||||
|
return conf.getBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA,
|
||||||
|
MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method creates and initializes an IV (Initialization Vector)
|
||||||
|
*
|
||||||
|
* @param conf
|
||||||
|
* @return byte[]
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static byte[] createIV(Configuration conf) throws IOException {
|
||||||
|
CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
|
||||||
|
if (isShuffleEncrypted(conf)) {
|
||||||
|
byte[] iv = new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()];
|
||||||
|
cryptoCodec.generateSecureRandom(iv);
|
||||||
|
return iv;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int cryptoPadding(Configuration conf) {
|
||||||
|
// Sizeof(IV) + long(start-offset)
|
||||||
|
return isShuffleEncrypted(conf) ? CryptoCodec.getInstance(conf)
|
||||||
|
.getCipherSuite().getAlgorithmBlockSize() + 8 : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] getEncryptionKey() throws IOException {
|
||||||
|
return TokenCache.getShuffleSecretKey(UserGroupInformation.getCurrentUser()
|
||||||
|
.getCredentials());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int getBufferSize(Configuration conf) {
|
||||||
|
return conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB,
|
||||||
|
MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB) * 1024;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wraps a given FSDataOutputStream with a CryptoOutputStream. The size of the
|
||||||
|
* data buffer required for the stream is specified by the
|
||||||
|
* "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
|
||||||
|
* variable.
|
||||||
|
*
|
||||||
|
* @param conf
|
||||||
|
* @param out
|
||||||
|
* @return FSDataOutputStream
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static FSDataOutputStream wrapIfNecessary(Configuration conf,
|
||||||
|
FSDataOutputStream out) throws IOException {
|
||||||
|
if (isShuffleEncrypted(conf)) {
|
||||||
|
out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array());
|
||||||
|
byte[] iv = createIV(conf);
|
||||||
|
out.write(iv);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("IV written to Stream ["
|
||||||
|
+ Base64.encodeBase64URLSafeString(iv) + "]");
|
||||||
|
}
|
||||||
|
return new CryptoFSDataOutputStream(out, CryptoCodec.getInstance(conf),
|
||||||
|
getBufferSize(conf), getEncryptionKey(), iv);
|
||||||
|
} else {
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wraps a given InputStream with a CryptoInputStream. The size of the data
|
||||||
|
* buffer required for the stream is specified by the
|
||||||
|
* "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
|
||||||
|
* variable.
|
||||||
|
*
|
||||||
|
* If the value of 'length' is > -1, The InputStream is additionally wrapped
|
||||||
|
* in a LimitInputStream. CryptoStreams are late buffering in nature. This
|
||||||
|
* means they will always try to read ahead if they can. The LimitInputStream
|
||||||
|
* will ensure that the CryptoStream does not read past the provided length
|
||||||
|
* from the given Input Stream.
|
||||||
|
*
|
||||||
|
* @param conf
|
||||||
|
* @param in
|
||||||
|
* @param length
|
||||||
|
* @return InputStream
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static InputStream wrapIfNecessary(Configuration conf, InputStream in,
|
||||||
|
long length) throws IOException {
|
||||||
|
if (isShuffleEncrypted(conf)) {
|
||||||
|
int bufferSize = getBufferSize(conf);
|
||||||
|
if (length > -1) {
|
||||||
|
in = new LimitInputStream(in, length);
|
||||||
|
}
|
||||||
|
byte[] offsetArray = new byte[8];
|
||||||
|
IOUtils.readFully(in, offsetArray, 0, 8);
|
||||||
|
long offset = ByteBuffer.wrap(offsetArray).getLong();
|
||||||
|
CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
|
||||||
|
byte[] iv =
|
||||||
|
new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()];
|
||||||
|
IOUtils.readFully(in, iv, 0,
|
||||||
|
cryptoCodec.getCipherSuite().getAlgorithmBlockSize());
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("IV read from ["
|
||||||
|
+ Base64.encodeBase64URLSafeString(iv) + "]");
|
||||||
|
}
|
||||||
|
return new CryptoInputStream(in, cryptoCodec, bufferSize,
|
||||||
|
getEncryptionKey(), iv, offset + cryptoPadding(conf));
|
||||||
|
} else {
|
||||||
|
return in;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wraps a given FSDataInputStream with a CryptoInputStream. The size of the
|
||||||
|
* data buffer required for the stream is specified by the
|
||||||
|
* "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
|
||||||
|
* variable.
|
||||||
|
*
|
||||||
|
* @param conf
|
||||||
|
* @param in
|
||||||
|
* @return FSDataInputStream
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static FSDataInputStream wrapIfNecessary(Configuration conf,
|
||||||
|
FSDataInputStream in) throws IOException {
|
||||||
|
if (isShuffleEncrypted(conf)) {
|
||||||
|
CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
|
||||||
|
int bufferSize = getBufferSize(conf);
|
||||||
|
// Not going to be used... but still has to be read...
|
||||||
|
// Since the O/P stream always writes it..
|
||||||
|
IOUtils.readFully(in, new byte[8], 0, 8);
|
||||||
|
byte[] iv =
|
||||||
|
new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()];
|
||||||
|
IOUtils.readFully(in, iv, 0,
|
||||||
|
cryptoCodec.getCipherSuite().getAlgorithmBlockSize());
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("IV read from Stream ["
|
||||||
|
+ Base64.encodeBase64URLSafeString(iv) + "]");
|
||||||
|
}
|
||||||
|
return new CryptoFSDataInputStream(in, cryptoCodec, bufferSize,
|
||||||
|
getEncryptionKey(), iv);
|
||||||
|
} else {
|
||||||
|
return in;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -291,7 +291,7 @@ private void copyJar(Path originalJarPath, Path submitJarFile,
|
|||||||
/**
|
/**
|
||||||
* configure the jobconf of the user with the command line options of
|
* configure the jobconf of the user with the command line options of
|
||||||
* -libjars, -files, -archives.
|
* -libjars, -files, -archives.
|
||||||
* @param conf
|
* @param job
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
|
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
|
||||||
@ -376,8 +376,13 @@ JobStatus submitJobInternal(Job job, Cluster cluster)
|
|||||||
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
|
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
|
||||||
KeyGenerator keyGen;
|
KeyGenerator keyGen;
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
int keyLen = CryptoUtils.isShuffleEncrypted(conf)
|
||||||
|
? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
|
||||||
|
MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
|
||||||
|
: SHUFFLE_KEY_LENGTH;
|
||||||
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
|
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
|
||||||
keyGen.init(SHUFFLE_KEY_LENGTH);
|
keyGen.init(keyLen);
|
||||||
} catch (NoSuchAlgorithmException e) {
|
} catch (NoSuchAlgorithmException e) {
|
||||||
throw new IOException("Error generating shuffle secret key", e);
|
throw new IOException("Error generating shuffle secret key", e);
|
||||||
}
|
}
|
||||||
|
@ -762,4 +762,18 @@ public interface MRJobConfig {
|
|||||||
|
|
||||||
public static final String TASK_PREEMPTION =
|
public static final String TASK_PREEMPTION =
|
||||||
"mapreduce.job.preemption";
|
"mapreduce.job.preemption";
|
||||||
|
|
||||||
|
public static final String MR_ENCRYPTED_INTERMEDIATE_DATA =
|
||||||
|
"mapreduce.job.encrypted-intermediate-data";
|
||||||
|
public static final boolean DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA = false;
|
||||||
|
|
||||||
|
public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS =
|
||||||
|
"mapreduce.job.encrypted-intermediate-data-key-size-bits";
|
||||||
|
public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS =
|
||||||
|
128;
|
||||||
|
|
||||||
|
public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
|
||||||
|
"mapreduce.job.encrypted-intermediate-data.buffer.kb";
|
||||||
|
public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
|
||||||
|
128;
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
@ -43,6 +44,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
|
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
|
||||||
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
@ -65,6 +67,7 @@ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
|
|||||||
CONNECTION, WRONG_REDUCE}
|
CONNECTION, WRONG_REDUCE}
|
||||||
|
|
||||||
private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
|
private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
|
||||||
|
private final JobConf jobConf;
|
||||||
private final Counters.Counter connectionErrs;
|
private final Counters.Counter connectionErrs;
|
||||||
private final Counters.Counter ioErrs;
|
private final Counters.Counter ioErrs;
|
||||||
private final Counters.Counter wrongLengthErrs;
|
private final Counters.Counter wrongLengthErrs;
|
||||||
@ -104,6 +107,7 @@ public Fetcher(JobConf job, TaskAttemptID reduceId,
|
|||||||
Reporter reporter, ShuffleClientMetrics metrics,
|
Reporter reporter, ShuffleClientMetrics metrics,
|
||||||
ExceptionReporter exceptionReporter, SecretKey shuffleKey,
|
ExceptionReporter exceptionReporter, SecretKey shuffleKey,
|
||||||
int id) {
|
int id) {
|
||||||
|
this.jobConf = job;
|
||||||
this.reporter = reporter;
|
this.reporter = reporter;
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
this.merger = merger;
|
this.merger = merger;
|
||||||
@ -394,6 +398,10 @@ private TaskAttemptID[] copyMapOutput(MapHost host,
|
|||||||
return remaining.toArray(new TaskAttemptID[remaining.size()]);
|
return remaining.toArray(new TaskAttemptID[remaining.size()]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
InputStream is = input;
|
||||||
|
is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
|
||||||
|
compressedLength -= CryptoUtils.cryptoPadding(jobConf);
|
||||||
|
decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
|
||||||
|
|
||||||
// Do some basic sanity verification
|
// Do some basic sanity verification
|
||||||
if (!verifySanity(compressedLength, decompressedLength, forReduce,
|
if (!verifySanity(compressedLength, decompressedLength, forReduce,
|
||||||
@ -431,7 +439,7 @@ private TaskAttemptID[] copyMapOutput(MapHost host,
|
|||||||
LOG.info("fetcher#" + id + " about to shuffle output of map "
|
LOG.info("fetcher#" + id + " about to shuffle output of map "
|
||||||
+ mapOutput.getMapId() + " decomp: " + decompressedLength
|
+ mapOutput.getMapId() + " decomp: " + decompressedLength
|
||||||
+ " len: " + compressedLength + " to " + mapOutput.getDescription());
|
+ " len: " + compressedLength + " to " + mapOutput.getDescription());
|
||||||
mapOutput.shuffle(host, input, compressedLength, decompressedLength,
|
mapOutput.shuffle(host, is, compressedLength, decompressedLength,
|
||||||
metrics, reporter);
|
metrics, reporter);
|
||||||
} catch (java.lang.InternalError e) {
|
} catch (java.lang.InternalError e) {
|
||||||
LOG.warn("Failed to shuffle for fetcher#"+id, e);
|
LOG.warn("Failed to shuffle for fetcher#"+id, e);
|
||||||
|
@ -36,6 +36,7 @@
|
|||||||
import org.apache.hadoop.mapred.Reporter;
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.mapred.SpillRecord;
|
import org.apache.hadoop.mapred.SpillRecord;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* LocalFetcher is used by LocalJobRunner to perform a local filesystem
|
* LocalFetcher is used by LocalJobRunner to perform a local filesystem
|
||||||
@ -145,6 +146,9 @@ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
|
|||||||
// now read the file, seek to the appropriate section, and send it.
|
// now read the file, seek to the appropriate section, and send it.
|
||||||
FileSystem localFs = FileSystem.getLocal(job).getRaw();
|
FileSystem localFs = FileSystem.getLocal(job).getRaw();
|
||||||
FSDataInputStream inStream = localFs.open(mapOutputFileName);
|
FSDataInputStream inStream = localFs.open(mapOutputFileName);
|
||||||
|
|
||||||
|
inStream = CryptoUtils.wrapIfNecessary(job, inStream);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
inStream.seek(ir.startOffset);
|
inStream.seek(ir.startOffset);
|
||||||
|
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.ChecksumFileSystem;
|
import org.apache.hadoop.fs.ChecksumFileSystem;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||||
import org.apache.hadoop.fs.LocalFileSystem;
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
@ -54,6 +55,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.TaskID;
|
import org.apache.hadoop.mapreduce.TaskID;
|
||||||
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
|
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
|
||||||
import org.apache.hadoop.util.Progress;
|
import org.apache.hadoop.util.Progress;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
@ -227,6 +229,10 @@ protected MergeThread<InMemoryMapOutput<K,V>, K,V> createInMemoryMerger() {
|
|||||||
return new InMemoryMerger(this);
|
return new InMemoryMerger(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected MergeThread<CompressAwarePath,K,V> createOnDiskMerger() {
|
||||||
|
return new OnDiskMerger(this);
|
||||||
|
}
|
||||||
|
|
||||||
TaskAttemptID getReduceId() {
|
TaskAttemptID getReduceId() {
|
||||||
return reduceId;
|
return reduceId;
|
||||||
}
|
}
|
||||||
@ -452,11 +458,10 @@ public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
|
|||||||
mergeOutputSize).suffix(
|
mergeOutputSize).suffix(
|
||||||
Task.MERGED_OUTPUT_PREFIX);
|
Task.MERGED_OUTPUT_PREFIX);
|
||||||
|
|
||||||
Writer<K,V> writer =
|
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
|
||||||
new Writer<K,V>(jobConf, rfs, outputPath,
|
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
|
||||||
(Class<K>) jobConf.getMapOutputKeyClass(),
|
(Class<K>) jobConf.getMapOutputKeyClass(),
|
||||||
(Class<V>) jobConf.getMapOutputValueClass(),
|
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
|
||||||
codec, null);
|
|
||||||
|
|
||||||
RawKeyValueIterator rIter = null;
|
RawKeyValueIterator rIter = null;
|
||||||
CompressAwarePath compressAwarePath;
|
CompressAwarePath compressAwarePath;
|
||||||
@ -536,11 +541,12 @@ public void merge(List<CompressAwarePath> inputs) throws IOException {
|
|||||||
Path outputPath =
|
Path outputPath =
|
||||||
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
|
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
|
||||||
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
|
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
|
||||||
Writer<K,V> writer =
|
|
||||||
new Writer<K,V>(jobConf, rfs, outputPath,
|
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
|
||||||
|
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
|
||||||
(Class<K>) jobConf.getMapOutputKeyClass(),
|
(Class<K>) jobConf.getMapOutputKeyClass(),
|
||||||
(Class<V>) jobConf.getMapOutputValueClass(),
|
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
|
||||||
codec, null);
|
|
||||||
RawKeyValueIterator iter = null;
|
RawKeyValueIterator iter = null;
|
||||||
CompressAwarePath compressAwarePath;
|
CompressAwarePath compressAwarePath;
|
||||||
Path tmpDir = new Path(reduceId.toString());
|
Path tmpDir = new Path(reduceId.toString());
|
||||||
@ -716,8 +722,10 @@ private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
|
|||||||
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
|
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
|
||||||
tmpDir, comparator, reporter, spilledRecordsCounter, null,
|
tmpDir, comparator, reporter, spilledRecordsCounter, null,
|
||||||
mergePhase);
|
mergePhase);
|
||||||
Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
|
|
||||||
keyClass, valueClass, codec, null);
|
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(job, fs.create(outputPath));
|
||||||
|
Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass,
|
||||||
|
codec, null, true);
|
||||||
try {
|
try {
|
||||||
Merger.writeFile(rIter, writer, reporter, job);
|
Merger.writeFile(rIter, writer, reporter, job);
|
||||||
writer.close();
|
writer.close();
|
||||||
|
@ -37,6 +37,7 @@
|
|||||||
import org.apache.hadoop.mapred.MapOutputFile;
|
import org.apache.hadoop.mapred.MapOutputFile;
|
||||||
|
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
|
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
@ -75,7 +76,7 @@ public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
|
|||||||
this.merger = merger;
|
this.merger = merger;
|
||||||
this.outputPath = outputPath;
|
this.outputPath = outputPath;
|
||||||
tmpOutputPath = getTempPath(outputPath, fetcher);
|
tmpOutputPath = getTempPath(outputPath, fetcher);
|
||||||
disk = fs.create(tmpOutputPath);
|
disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -24,14 +24,16 @@
|
|||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
@ -51,10 +53,16 @@
|
|||||||
import org.apache.hadoop.mapred.Reporter;
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.TaskID;
|
import org.apache.hadoop.mapreduce.TaskID;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
|
import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||||
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
|
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
|
||||||
|
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.Progress;
|
import org.apache.hadoop.util.Progress;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
@ -63,6 +71,8 @@
|
|||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
public class TestMerger {
|
public class TestMerger {
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
@ -76,15 +86,21 @@ public void setup() throws IOException {
|
|||||||
fs = FileSystem.getLocal(conf);
|
fs = FileSystem.getLocal(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
|
||||||
public void cleanup() throws IOException {
|
@Test
|
||||||
fs.delete(new Path(jobConf.getLocalDirs()[0]), true);
|
public void testEncryptedMerger() throws Throwable {
|
||||||
|
jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
|
||||||
|
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
|
||||||
|
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
|
||||||
|
TokenCache.setShuffleSecretKey(new byte[16], credentials);
|
||||||
|
UserGroupInformation.getCurrentUser().addCredentials(credentials);
|
||||||
|
testInMemoryAndOnDiskMerger();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInMemoryMerger() throws Throwable {
|
public void testInMemoryAndOnDiskMerger() throws Throwable {
|
||||||
JobID jobId = new JobID("a", 0);
|
JobID jobId = new JobID("a", 0);
|
||||||
TaskAttemptID reduceId = new TaskAttemptID(
|
TaskAttemptID reduceId1 = new TaskAttemptID(
|
||||||
new TaskID(jobId, TaskType.REDUCE, 0), 0);
|
new TaskID(jobId, TaskType.REDUCE, 0), 0);
|
||||||
TaskAttemptID mapId1 = new TaskAttemptID(
|
TaskAttemptID mapId1 = new TaskAttemptID(
|
||||||
new TaskID(jobId, TaskType.MAP, 1), 0);
|
new TaskID(jobId, TaskType.MAP, 1), 0);
|
||||||
@ -94,7 +110,7 @@ public void testInMemoryMerger() throws Throwable {
|
|||||||
LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
|
LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
|
||||||
|
|
||||||
MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
|
MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
|
||||||
reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
|
reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
|
||||||
null, null, new Progress(), new MROutputFiles());
|
null, null, new Progress(), new MROutputFiles());
|
||||||
|
|
||||||
// write map outputs
|
// write map outputs
|
||||||
@ -117,21 +133,77 @@ public void testInMemoryMerger() throws Throwable {
|
|||||||
// create merger and run merge
|
// create merger and run merge
|
||||||
MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
|
MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
|
||||||
mergeManager.createInMemoryMerger();
|
mergeManager.createInMemoryMerger();
|
||||||
List<InMemoryMapOutput<Text, Text>> mapOutputs =
|
List<InMemoryMapOutput<Text, Text>> mapOutputs1 =
|
||||||
new ArrayList<InMemoryMapOutput<Text, Text>>();
|
new ArrayList<InMemoryMapOutput<Text, Text>>();
|
||||||
mapOutputs.add(mapOutput1);
|
mapOutputs1.add(mapOutput1);
|
||||||
mapOutputs.add(mapOutput2);
|
mapOutputs1.add(mapOutput2);
|
||||||
|
|
||||||
inMemoryMerger.merge(mapOutputs);
|
inMemoryMerger.merge(mapOutputs1);
|
||||||
|
|
||||||
Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
|
Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
|
||||||
Path outPath = mergeManager.onDiskMapOutputs.iterator().next();
|
|
||||||
|
|
||||||
|
TaskAttemptID reduceId2 = new TaskAttemptID(
|
||||||
|
new TaskID(jobId, TaskType.REDUCE, 3), 0);
|
||||||
|
TaskAttemptID mapId3 = new TaskAttemptID(
|
||||||
|
new TaskID(jobId, TaskType.MAP, 4), 0);
|
||||||
|
TaskAttemptID mapId4 = new TaskAttemptID(
|
||||||
|
new TaskID(jobId, TaskType.MAP, 5), 0);
|
||||||
|
// write map outputs
|
||||||
|
Map<String, String> map3 = new TreeMap<String, String>();
|
||||||
|
map3.put("apple", "awesome");
|
||||||
|
map3.put("carrot", "amazing");
|
||||||
|
Map<String, String> map4 = new TreeMap<String, String>();
|
||||||
|
map4.put("banana", "bla");
|
||||||
|
byte[] mapOutputBytes3 = writeMapOutput(conf, map3);
|
||||||
|
byte[] mapOutputBytes4 = writeMapOutput(conf, map4);
|
||||||
|
InMemoryMapOutput<Text, Text> mapOutput3 = new InMemoryMapOutput<Text, Text>(
|
||||||
|
conf, mapId3, mergeManager, mapOutputBytes3.length, null, true);
|
||||||
|
InMemoryMapOutput<Text, Text> mapOutput4 = new InMemoryMapOutput<Text, Text>(
|
||||||
|
conf, mapId4, mergeManager, mapOutputBytes4.length, null, true);
|
||||||
|
System.arraycopy(mapOutputBytes3, 0, mapOutput3.getMemory(), 0,
|
||||||
|
mapOutputBytes3.length);
|
||||||
|
System.arraycopy(mapOutputBytes4, 0, mapOutput4.getMemory(), 0,
|
||||||
|
mapOutputBytes4.length);
|
||||||
|
|
||||||
|
// // create merger and run merge
|
||||||
|
MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger2 =
|
||||||
|
mergeManager.createInMemoryMerger();
|
||||||
|
List<InMemoryMapOutput<Text, Text>> mapOutputs2 =
|
||||||
|
new ArrayList<InMemoryMapOutput<Text, Text>>();
|
||||||
|
mapOutputs2.add(mapOutput3);
|
||||||
|
mapOutputs2.add(mapOutput4);
|
||||||
|
|
||||||
|
inMemoryMerger2.merge(mapOutputs2);
|
||||||
|
|
||||||
|
Assert.assertEquals(2, mergeManager.onDiskMapOutputs.size());
|
||||||
|
|
||||||
|
List<CompressAwarePath> paths = new ArrayList<CompressAwarePath>();
|
||||||
|
Iterator<CompressAwarePath> iterator = mergeManager.onDiskMapOutputs.iterator();
|
||||||
List<String> keys = new ArrayList<String>();
|
List<String> keys = new ArrayList<String>();
|
||||||
List<String> values = new ArrayList<String>();
|
List<String> values = new ArrayList<String>();
|
||||||
readOnDiskMapOutput(conf, fs, outPath, keys, values);
|
while (iterator.hasNext()) {
|
||||||
Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
|
CompressAwarePath next = iterator.next();
|
||||||
Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
|
readOnDiskMapOutput(conf, fs, next, keys, values);
|
||||||
|
paths.add(next);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot", "apple", "banana", "carrot"));
|
||||||
|
Assert.assertEquals(values, Arrays.asList("awesome", "bla", "amazing", "disgusting", "pretty good", "delicious"));
|
||||||
|
mergeManager.close();
|
||||||
|
|
||||||
|
mergeManager = new MergeManagerImpl<Text, Text>(
|
||||||
|
reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
|
||||||
|
null, null, new Progress(), new MROutputFiles());
|
||||||
|
|
||||||
|
MergeThread<CompressAwarePath,Text,Text> onDiskMerger = mergeManager.createOnDiskMerger();
|
||||||
|
onDiskMerger.merge(paths);
|
||||||
|
|
||||||
|
Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
|
||||||
|
|
||||||
|
keys = new ArrayList<String>();
|
||||||
|
values = new ArrayList<String>();
|
||||||
|
readOnDiskMapOutput(conf, fs, mergeManager.onDiskMapOutputs.iterator().next(), keys, values);
|
||||||
|
Assert.assertEquals(keys, Arrays.asList("apple", "apple", "banana", "banana", "carrot", "carrot"));
|
||||||
|
Assert.assertEquals(values, Arrays.asList("awesome", "disgusting", "pretty good", "bla", "amazing", "delicious"));
|
||||||
|
|
||||||
mergeManager.close();
|
mergeManager.close();
|
||||||
Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
|
Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
|
||||||
@ -155,8 +227,10 @@ private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValu
|
|||||||
|
|
||||||
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
|
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
|
||||||
List<String> keys, List<String> values) throws IOException {
|
List<String> keys, List<String> values) throws IOException {
|
||||||
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, fs,
|
FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
|
||||||
path, null, null);
|
|
||||||
|
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
|
||||||
|
fs.getFileStatus(path).getLen(), null, null);
|
||||||
DataInputBuffer keyBuff = new DataInputBuffer();
|
DataInputBuffer keyBuff = new DataInputBuffer();
|
||||||
DataInputBuffer valueBuff = new DataInputBuffer();
|
DataInputBuffer valueBuff = new DataInputBuffer();
|
||||||
Text key = new Text();
|
Text key = new Text();
|
||||||
|
@ -18,6 +18,8 @@
|
|||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocalFileSystem;
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@ -42,7 +44,7 @@ public void testIFileWriterWithCodec() throws Exception {
|
|||||||
DefaultCodec codec = new GzipCodec();
|
DefaultCodec codec = new GzipCodec();
|
||||||
codec.setConf(conf);
|
codec.setConf(conf);
|
||||||
IFile.Writer<Text, Text> writer =
|
IFile.Writer<Text, Text> writer =
|
||||||
new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
|
new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class,
|
||||||
codec, null);
|
codec, null);
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
@ -56,12 +58,15 @@ public void testIFileReaderWithCodec() throws Exception {
|
|||||||
Path path = new Path(new Path("build/test.ifile"), "data");
|
Path path = new Path(new Path("build/test.ifile"), "data");
|
||||||
DefaultCodec codec = new GzipCodec();
|
DefaultCodec codec = new GzipCodec();
|
||||||
codec.setConf(conf);
|
codec.setConf(conf);
|
||||||
|
FSDataOutputStream out = rfs.create(path);
|
||||||
IFile.Writer<Text, Text> writer =
|
IFile.Writer<Text, Text> writer =
|
||||||
new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
|
new IFile.Writer<Text, Text>(conf, out, Text.class, Text.class,
|
||||||
codec, null);
|
codec, null);
|
||||||
writer.close();
|
writer.close();
|
||||||
|
FSDataInputStream in = rfs.open(path);
|
||||||
IFile.Reader<Text, Text> reader =
|
IFile.Reader<Text, Text> reader =
|
||||||
new IFile.Reader<Text, Text>(conf, rfs, path, codec, null);
|
new IFile.Reader<Text, Text>(conf, in, rfs.getFileStatus(path).getLen(),
|
||||||
|
codec, null);
|
||||||
reader.close();
|
reader.close();
|
||||||
|
|
||||||
// test check sum
|
// test check sum
|
||||||
|
@ -0,0 +1,254 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.io.Writer;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
@SuppressWarnings(value={"unchecked", "deprecation"})
|
||||||
|
/**
|
||||||
|
* This test tests the support for a merge operation in Hadoop. The input files
|
||||||
|
* are already sorted on the key. This test implements an external
|
||||||
|
* MapOutputCollector implementation that just copies the records to different
|
||||||
|
* partitions while maintaining the sort order in each partition. The Hadoop
|
||||||
|
* framework's merge on the reduce side will merge the partitions created to
|
||||||
|
* generate the final output which is sorted on the key.
|
||||||
|
*/
|
||||||
|
public class TestMRIntermediateDataEncryption {
|
||||||
|
// Where MR job's input will reside.
|
||||||
|
private static final Path INPUT_DIR = new Path("/test/input");
|
||||||
|
// Where output goes.
|
||||||
|
private static final Path OUTPUT = new Path("/test/output");
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingleReducer() throws Exception {
|
||||||
|
doEncryptionTest(3, 1, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleMapsPerNode() throws Exception {
|
||||||
|
doEncryptionTest(8, 1, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleReducers() throws Exception {
|
||||||
|
doEncryptionTest(2, 4, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doEncryptionTest(int numMappers, int numReducers, int numNodes) throws Exception {
|
||||||
|
doEncryptionTest(numMappers, numReducers, numNodes, 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doEncryptionTest(int numMappers, int numReducers, int numNodes, int numLines) throws Exception {
|
||||||
|
MiniDFSCluster dfsCluster = null;
|
||||||
|
MiniMRClientCluster mrCluster = null;
|
||||||
|
FileSystem fileSystem = null;
|
||||||
|
try {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// Start the mini-MR and mini-DFS clusters
|
||||||
|
|
||||||
|
dfsCluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(numNodes).build();
|
||||||
|
fileSystem = dfsCluster.getFileSystem();
|
||||||
|
mrCluster = MiniMRClientClusterFactory.create(this.getClass(),
|
||||||
|
numNodes, conf);
|
||||||
|
// Generate input.
|
||||||
|
createInput(fileSystem, numMappers, numLines);
|
||||||
|
// Run the test.
|
||||||
|
runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem, numMappers, numReducers, numLines);
|
||||||
|
} finally {
|
||||||
|
if (dfsCluster != null) {
|
||||||
|
dfsCluster.shutdown();
|
||||||
|
}
|
||||||
|
if (mrCluster != null) {
|
||||||
|
mrCluster.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createInput(FileSystem fs, int numMappers, int numLines) throws Exception {
|
||||||
|
fs.delete(INPUT_DIR, true);
|
||||||
|
for (int i = 0; i < numMappers; i++) {
|
||||||
|
OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
|
||||||
|
Writer writer = new OutputStreamWriter(os);
|
||||||
|
for (int j = 0; j < numLines; j++) {
|
||||||
|
// Create sorted key, value pairs.
|
||||||
|
int k = j + 1;
|
||||||
|
String formattedNumber = String.format("%09d", k);
|
||||||
|
writer.write(formattedNumber + " " + formattedNumber + "\n");
|
||||||
|
}
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runMergeTest(JobConf job, FileSystem fileSystem, int numMappers, int numReducers, int numLines)
|
||||||
|
throws Exception {
|
||||||
|
fileSystem.delete(OUTPUT, true);
|
||||||
|
job.setJobName("Test");
|
||||||
|
JobClient client = new JobClient(job);
|
||||||
|
RunningJob submittedJob = null;
|
||||||
|
FileInputFormat.setInputPaths(job, INPUT_DIR);
|
||||||
|
FileOutputFormat.setOutputPath(job, OUTPUT);
|
||||||
|
job.set("mapreduce.output.textoutputformat.separator", " ");
|
||||||
|
job.setInputFormat(TextInputFormat.class);
|
||||||
|
job.setMapOutputKeyClass(Text.class);
|
||||||
|
job.setMapOutputValueClass(Text.class);
|
||||||
|
job.setOutputKeyClass(Text.class);
|
||||||
|
job.setOutputValueClass(Text.class);
|
||||||
|
job.setMapperClass(MyMapper.class);
|
||||||
|
job.setPartitionerClass(MyPartitioner.class);
|
||||||
|
job.setOutputFormat(TextOutputFormat.class);
|
||||||
|
job.setNumReduceTasks(numReducers);
|
||||||
|
|
||||||
|
job.setInt("mapreduce.map.maxattempts", 1);
|
||||||
|
job.setInt("mapreduce.reduce.maxattempts", 1);
|
||||||
|
job.setInt("mapred.test.num_lines", numLines);
|
||||||
|
job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
|
||||||
|
try {
|
||||||
|
submittedJob = client.submitJob(job);
|
||||||
|
try {
|
||||||
|
if (! client.monitorAndPrintJob(job, submittedJob)) {
|
||||||
|
throw new IOException("Job failed!");
|
||||||
|
}
|
||||||
|
} catch(InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
System.err.println("Job failed with: " + ioe);
|
||||||
|
} finally {
|
||||||
|
verifyOutput(submittedJob, fileSystem, numMappers, numLines);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int numMappers, int numLines)
|
||||||
|
throws Exception {
|
||||||
|
FSDataInputStream dis = null;
|
||||||
|
long numValidRecords = 0;
|
||||||
|
long numInvalidRecords = 0;
|
||||||
|
String prevKeyValue = "000000000";
|
||||||
|
Path[] fileList =
|
||||||
|
FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
|
||||||
|
new Utils.OutputFileUtils.OutputFilesFilter()));
|
||||||
|
for (Path outFile : fileList) {
|
||||||
|
try {
|
||||||
|
dis = fileSystem.open(outFile);
|
||||||
|
String record;
|
||||||
|
while((record = dis.readLine()) != null) {
|
||||||
|
// Split the line into key and value.
|
||||||
|
int blankPos = record.indexOf(" ");
|
||||||
|
String keyString = record.substring(0, blankPos);
|
||||||
|
String valueString = record.substring(blankPos+1);
|
||||||
|
// Check for sorted output and correctness of record.
|
||||||
|
if (keyString.compareTo(prevKeyValue) >= 0
|
||||||
|
&& keyString.equals(valueString)) {
|
||||||
|
prevKeyValue = keyString;
|
||||||
|
numValidRecords++;
|
||||||
|
} else {
|
||||||
|
numInvalidRecords++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (dis != null) {
|
||||||
|
dis.close();
|
||||||
|
dis = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Make sure we got all input records in the output in sorted order.
|
||||||
|
assertEquals((long)(numMappers * numLines), numValidRecords);
|
||||||
|
// Make sure there is no extraneous invalid record.
|
||||||
|
assertEquals(0, numInvalidRecords);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A mapper implementation that assumes that key text contains valid integers
|
||||||
|
* in displayable form.
|
||||||
|
*/
|
||||||
|
public static class MyMapper extends MapReduceBase
|
||||||
|
implements Mapper<LongWritable, Text, Text, Text> {
|
||||||
|
private Text keyText;
|
||||||
|
private Text valueText;
|
||||||
|
|
||||||
|
public MyMapper() {
|
||||||
|
keyText = new Text();
|
||||||
|
valueText = new Text();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void map(LongWritable key, Text value,
|
||||||
|
OutputCollector<Text, Text> output,
|
||||||
|
Reporter reporter) throws IOException {
|
||||||
|
String record = value.toString();
|
||||||
|
int blankPos = record.indexOf(" ");
|
||||||
|
keyText.set(record.substring(0, blankPos));
|
||||||
|
valueText.set(record.substring(blankPos+1));
|
||||||
|
output.collect(keyText, valueText);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() throws IOException {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Partitioner implementation to make sure that output is in total sorted
|
||||||
|
* order. We basically route key ranges to different reducers such that
|
||||||
|
* key values monotonically increase with the partition number. For example,
|
||||||
|
* in this test, the keys are numbers from 1 to 1000 in the form "000000001"
|
||||||
|
* to "000001000" in each input file. The keys "000000001" to "000000250" are
|
||||||
|
* routed to partition 0, "000000251" to "000000500" are routed to partition 1
|
||||||
|
* and so on since we have 4 reducers.
|
||||||
|
*/
|
||||||
|
static class MyPartitioner implements Partitioner<Text, Text> {
|
||||||
|
|
||||||
|
private JobConf job;
|
||||||
|
|
||||||
|
public MyPartitioner() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void configure(JobConf job) {
|
||||||
|
this.job = job;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPartition(Text key, Text value, int numPartitions) {
|
||||||
|
int keyValue = 0;
|
||||||
|
try {
|
||||||
|
keyValue = Integer.parseInt(key.toString());
|
||||||
|
} catch(NumberFormatException nfe) {
|
||||||
|
keyValue = 0;
|
||||||
|
}
|
||||||
|
int partitionNumber = (numPartitions*(Math.max(0, keyValue-1)))/job.getInt("mapred.test.num_lines", 10000);
|
||||||
|
return partitionNumber;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -80,7 +80,7 @@ public void runValueIterator(Path tmpDir, Pair[] vals,
|
|||||||
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
|
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
|
||||||
Path path = new Path(tmpDir, "data.in");
|
Path path = new Path(tmpDir, "data.in");
|
||||||
IFile.Writer<Text, Text> writer =
|
IFile.Writer<Text, Text> writer =
|
||||||
new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
|
new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class,
|
||||||
codec, null);
|
codec, null);
|
||||||
for(Pair p: vals) {
|
for(Pair p: vals) {
|
||||||
writer.append(new Text(p.key), new Text(p.value));
|
writer.append(new Text(p.key), new Text(p.value));
|
||||||
|
@ -95,9 +95,9 @@ public void testRunner() throws Exception {
|
|||||||
new Counters.Counter(), new Progress());
|
new Counters.Counter(), new Progress());
|
||||||
FileSystem fs = new RawLocalFileSystem();
|
FileSystem fs = new RawLocalFileSystem();
|
||||||
fs.setConf(conf);
|
fs.setConf(conf);
|
||||||
Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs,
|
Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs.create(
|
||||||
new Path(workSpace + File.separator + "outfile"), IntWritable.class,
|
new Path(workSpace + File.separator + "outfile")), IntWritable.class,
|
||||||
Text.class, null, null);
|
Text.class, null, null, true);
|
||||||
output.setWriter(wr);
|
output.setWriter(wr);
|
||||||
// stub for client
|
// stub for client
|
||||||
File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationRunnableStub");
|
File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationRunnableStub");
|
||||||
@ -177,9 +177,9 @@ public void testApplication() throws Throwable {
|
|||||||
new Progress());
|
new Progress());
|
||||||
FileSystem fs = new RawLocalFileSystem();
|
FileSystem fs = new RawLocalFileSystem();
|
||||||
fs.setConf(conf);
|
fs.setConf(conf);
|
||||||
Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs,
|
Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs.create(
|
||||||
new Path(workSpace.getAbsolutePath() + File.separator + "outfile"),
|
new Path(workSpace.getAbsolutePath() + File.separator + "outfile")),
|
||||||
IntWritable.class, Text.class, null, null);
|
IntWritable.class, Text.class, null, null, true);
|
||||||
output.setWriter(wr);
|
output.setWriter(wr);
|
||||||
conf.set(Submitter.PRESERVE_COMMANDFILE, "true");
|
conf.set(Submitter.PRESERVE_COMMANDFILE, "true");
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user