MAPREDUCE-5208. Modified ShuffleHandler to use SecureIOUtils for reading local files. Contributed by Omkar Vinit Joshi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1481657 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-12 21:59:16 +00:00
parent 68148989bf
commit 47d1ca402f
5 changed files with 201 additions and 6 deletions

View File

@ -401,6 +401,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5239. Updated MR App to reflect YarnRemoteException changes after
YARN-634. (Siddharth Seth via vinodkv)
MAPREDUCE-5208. Modified ShuffleHandler to use SecureIOUtils for reading
local files. (Omkar Vinit Joshi via vinodkv)
Release 2.0.4-alpha - 2013-04-25
INCOMPATIBLE CHANGES

View File

@ -19,6 +19,7 @@
import static org.apache.hadoop.mapred.MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
@ -34,6 +35,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.util.PureJavaCrc32;
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@ -65,17 +67,19 @@ public SpillRecord(Path indexFileName, JobConf job, Checksum crc,
throws IOException {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
final FSDataInputStream in = rfs.open(indexFileName);
final FSDataInputStream in =
SecureIOUtils.openFSDataInputStream(new File(indexFileName.toUri()
.getRawPath()), expectedIndexOwner, null);
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;
final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
buf = ByteBuffer.allocate(size);
if (crc != null) {
crc.reset();
CheckedInputStream chk = new CheckedInputStream(in, crc);
IOUtils.readFully(chk, buf.array(), 0, size);
if (chk.getChecksum().getValue() != in.readLong()) {
throw new ChecksumException("Checksum error reading spill index: " +
indexFileName, -1);

View File

@ -58,9 +58,9 @@
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@ -71,6 +71,7 @@
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -490,8 +491,14 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
return;
}
} catch (IOException e) {
LOG.error("Shuffle error ", e);
sendError(ctx, e.getMessage(), INTERNAL_SERVER_ERROR);
LOG.error("Shuffle error :", e);
StringBuffer sb = new StringBuffer(e.getMessage());
Throwable t = e;
while (t.getCause() != null) {
sb.append(t.getCause().getMessage());
t = t.getCause();
}
sendError(ctx,sb.toString() , INTERNAL_SERVER_ERROR);
return;
}
}
@ -572,7 +579,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
final File spillfile = new File(mapOutputFileName.toString());
RandomAccessFile spill;
try {
spill = new RandomAccessFile(spillfile, "r");
spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null);
} catch (FileNotFoundException e) {
LOG.info(spillfile + " not found");
return null;

View File

@ -24,22 +24,48 @@
import static org.apache.hadoop.test.MockitoMaker.stub;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.SocketException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.zip.CheckedOutputStream;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
@ -245,4 +271,131 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
shuffleHandler.stop();
}
@Test(timeout = 100000)
public void testMapFileAccess() throws IOException {
// This will run only in NativeIO is enabled as SecureIOUtils need it
assumeTrue(NativeIO.isAvailable());
Configuration conf = new Configuration();
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
File absLogDir = new File("target",
TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
ApplicationId appId = BuilderUtils.newApplicationId(12345, 1);
System.out.println(appId.toString());
String appAttemptId = "attempt_12345_1_m_1_0";
String user = "randomUser";
String reducerId = "0";
List<File> fileMap = new ArrayList<File>();
createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
conf, fileMap);
ShuffleHandler shuffleHandler = new ShuffleHandler() {
@Override
protected Shuffle getShuffle(Configuration conf) {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
throws IOException {
}
};
}
};
shuffleHandler.init(conf);
try {
shuffleHandler.start();
DataOutputBuffer outputBuffer = new DataOutputBuffer();
outputBuffer.reset();
Token<JobTokenIdentifier> jt =
new Token<JobTokenIdentifier>("identifier".getBytes(),
"password".getBytes(), new Text(user), new Text("shuffleService"));
jt.write(outputBuffer);
shuffleHandler.initApp(user, appId,
ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength()));
URL url =
new URL(
"http://127.0.0.1:"
+ shuffleHandler.getConfig().get(
ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ "/mapOutput?job=job_12345_0001&reduce=" + reducerId
+ "&map=attempt_12345_1_m_1_0");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.connect();
byte[] byteArr = new byte[10000];
try {
DataInputStream is = new DataInputStream(conn.getInputStream());
is.readFully(byteArr);
} catch (EOFException e) {
// ignore
}
// Retrieve file owner name
FileInputStream is = new FileInputStream(fileMap.get(0));
String owner = NativeIO.POSIX.getFstat(is.getFD()).getOwner();
is.close();
String message =
"Owner '" + owner + "' for path " + fileMap.get(0).getAbsolutePath()
+ " did not match expected owner '" + user + "'";
Assert.assertTrue((new String(byteArr)).contains(message));
} finally {
shuffleHandler.stop();
}
}
public static void createShuffleHandlerFiles(File logDir, String user,
String appId, String appAttemptId, Configuration conf,
List<File> fileMap) throws IOException {
String attemptDir =
StringUtils.join(Path.SEPARATOR,
new String[] { logDir.getAbsolutePath(),
ContainerLocalizer.USERCACHE, user,
ContainerLocalizer.APPCACHE, appId, "output", appAttemptId });
File appAttemptDir = new File(attemptDir);
appAttemptDir.mkdirs();
System.out.println(appAttemptDir.getAbsolutePath());
File indexFile = new File(appAttemptDir, "file.out.index");
fileMap.add(indexFile);
createIndexFile(indexFile, conf);
File mapOutputFile = new File(appAttemptDir, "file.out");
fileMap.add(mapOutputFile);
createMapOutputFile(mapOutputFile, conf);
}
public static void
createMapOutputFile(File mapOutputFile, Configuration conf)
throws IOException {
FileOutputStream out = new FileOutputStream(mapOutputFile);
out.write("Creating new dummy map output file. Used only for testing"
.getBytes());
out.flush();
out.close();
}
public static void createIndexFile(File indexFile, Configuration conf)
throws IOException {
if (indexFile.exists()) {
System.out.println("Deleting existing file");
indexFile.delete();
}
indexFile.createNewFile();
FSDataOutputStream output = FileSystem.getLocal(conf).getRaw().append(
new Path(indexFile.getAbsolutePath()));
Checksum crc = new PureJavaCrc32();
crc.reset();
CheckedOutputStream chk = new CheckedOutputStream(output, crc);
String msg = "Writing new index file. This file will be used only " +
"for the testing.";
chk.write(Arrays.copyOf(msg.getBytes(),
MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH));
output.writeLong(chk.getChecksum().getValue());
output.close();
}
}

View File

@ -0,0 +1,28 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
[libdefaults]
default_realm = APACHE.ORG
udp_preference_limit = 1
extra_addresses = 127.0.0.1
[realms]
APACHE.ORG = {
admin_server = localhost:88
kdc = localhost:88
}
[domain_realm]
localhost = APACHE.ORG