From 47d1ca402fe0bafae32507dee0d27cd1e345a7e9 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Sun, 12 May 2013 21:59:16 +0000 Subject: [PATCH] MAPREDUCE-5208. Modified ShuffleHandler to use SecureIOUtils for reading local files. Contributed by Omkar Vinit Joshi. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1481657 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../org/apache/hadoop/mapred/SpillRecord.java | 8 +- .../apache/hadoop/mapred/ShuffleHandler.java | 15 +- .../hadoop/mapred/TestShuffleHandler.java | 153 ++++++++++++++++++ .../src/test/resources/krb5.conf | 28 ++++ 5 files changed, 201 insertions(+), 6 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/krb5.conf diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f6c95e8e94..d2ea59a80e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -401,6 +401,9 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5239. Updated MR App to reflect YarnRemoteException changes after YARN-634. (Siddharth Seth via vinodkv) + MAPREDUCE-5208. Modified ShuffleHandler to use SecureIOUtils for reading + local files. (Omkar Vinit Joshi via vinodkv) + Release 2.0.4-alpha - 2013-04-25 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java index 93a2d04cbf..9e04e6fb87 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.mapred.MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.LongBuffer; @@ -34,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.util.PureJavaCrc32; @InterfaceAudience.LimitedPrivate({"MapReduce"}) @@ -65,17 +67,19 @@ public SpillRecord(Path indexFileName, JobConf job, Checksum crc, throws IOException { final FileSystem rfs = FileSystem.getLocal(job).getRaw(); - final FSDataInputStream in = rfs.open(indexFileName); + final FSDataInputStream in = + SecureIOUtils.openFSDataInputStream(new File(indexFileName.toUri() + .getRawPath()), expectedIndexOwner, null); try { final long length = rfs.getFileStatus(indexFileName).getLen(); final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH; final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; - buf = ByteBuffer.allocate(size); if (crc != null) { crc.reset(); CheckedInputStream chk = new CheckedInputStream(in, crc); IOUtils.readFully(chk, buf.array(), 0, size); + if (chk.getChecksum().getValue() != in.readLong()) { throw new ChecksumException("Checksum error reading spill index: " + indexFileName, -1); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index ad6c3bb672..5715bd146d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -58,9 +58,9 @@ import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; -import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; @@ -71,6 +71,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterInt; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -490,8 +491,14 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) return; } } catch (IOException e) { - LOG.error("Shuffle error ", e); - sendError(ctx, e.getMessage(), INTERNAL_SERVER_ERROR); + LOG.error("Shuffle error :", e); + StringBuffer sb = new StringBuffer(e.getMessage()); + Throwable t = e; + while (t.getCause() != null) { + sb.append(t.getCause().getMessage()); + t = t.getCause(); + } + sendError(ctx,sb.toString() , INTERNAL_SERVER_ERROR); return; } } @@ -572,7 +579,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, final File spillfile = new File(mapOutputFileName.toString()); RandomAccessFile spill; try { - spill = new RandomAccessFile(spillfile, "r"); + spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null); } catch (FileNotFoundException e) { LOG.info(spillfile + " not found"); return null; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index 4d845c37a8..47d175b930 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -24,22 +24,48 @@ import static org.apache.hadoop.test.MockitoMaker.stub; import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeTrue; + import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.net.HttpURLConnection; import java.net.SocketException; import java.net.URL; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.zip.CheckedOutputStream; +import java.util.zip.Checksum; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.PureJavaCrc32; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; @@ -245,4 +271,131 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, shuffleHandler.stop(); } + + @Test(timeout = 100000) + public void testMapFileAccess() throws IOException { + // This will run only in NativeIO is enabled as SecureIOUtils need it + assumeTrue(NativeIO.isAvailable()); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + File absLogDir = new File("target", + TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + ApplicationId appId = BuilderUtils.newApplicationId(12345, 1); + System.out.println(appId.toString()); + String appAttemptId = "attempt_12345_1_m_1_0"; + String user = "randomUser"; + String reducerId = "0"; + List fileMap = new ArrayList(); + createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + conf, fileMap); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + } + + }; + } + }; + shuffleHandler.init(conf); + try { + shuffleHandler.start(); + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token jt = + new Token("identifier".getBytes(), + "password".getBytes(), new Text(user), new Text("shuffleService")); + jt.write(outputBuffer); + shuffleHandler.initApp(user, appId, + ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())); + URL url = + new URL( + "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_0001&reduce=" + reducerId + + "&map=attempt_12345_1_m_1_0"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.connect(); + byte[] byteArr = new byte[10000]; + try { + DataInputStream is = new DataInputStream(conn.getInputStream()); + is.readFully(byteArr); + } catch (EOFException e) { + // ignore + } + // Retrieve file owner name + FileInputStream is = new FileInputStream(fileMap.get(0)); + String owner = NativeIO.POSIX.getFstat(is.getFD()).getOwner(); + is.close(); + + String message = + "Owner '" + owner + "' for path " + fileMap.get(0).getAbsolutePath() + + " did not match expected owner '" + user + "'"; + Assert.assertTrue((new String(byteArr)).contains(message)); + } finally { + shuffleHandler.stop(); + } + } + + public static void createShuffleHandlerFiles(File logDir, String user, + String appId, String appAttemptId, Configuration conf, + List fileMap) throws IOException { + String attemptDir = + StringUtils.join(Path.SEPARATOR, + new String[] { logDir.getAbsolutePath(), + ContainerLocalizer.USERCACHE, user, + ContainerLocalizer.APPCACHE, appId, "output", appAttemptId }); + File appAttemptDir = new File(attemptDir); + appAttemptDir.mkdirs(); + System.out.println(appAttemptDir.getAbsolutePath()); + File indexFile = new File(appAttemptDir, "file.out.index"); + fileMap.add(indexFile); + createIndexFile(indexFile, conf); + File mapOutputFile = new File(appAttemptDir, "file.out"); + fileMap.add(mapOutputFile); + createMapOutputFile(mapOutputFile, conf); + } + + public static void + createMapOutputFile(File mapOutputFile, Configuration conf) + throws IOException { + FileOutputStream out = new FileOutputStream(mapOutputFile); + out.write("Creating new dummy map output file. Used only for testing" + .getBytes()); + out.flush(); + out.close(); + } + + public static void createIndexFile(File indexFile, Configuration conf) + throws IOException { + if (indexFile.exists()) { + System.out.println("Deleting existing file"); + indexFile.delete(); + } + indexFile.createNewFile(); + FSDataOutputStream output = FileSystem.getLocal(conf).getRaw().append( + new Path(indexFile.getAbsolutePath())); + Checksum crc = new PureJavaCrc32(); + crc.reset(); + CheckedOutputStream chk = new CheckedOutputStream(output, crc); + String msg = "Writing new index file. This file will be used only " + + "for the testing."; + chk.write(Arrays.copyOf(msg.getBytes(), + MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH)); + output.writeLong(chk.getChecksum().getValue()); + output.close(); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/krb5.conf b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/krb5.conf new file mode 100644 index 0000000000..121ac6d9b9 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/krb5.conf @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[libdefaults] + default_realm = APACHE.ORG + udp_preference_limit = 1 + extra_addresses = 127.0.0.1 +[realms] + APACHE.ORG = { + admin_server = localhost:88 + kdc = localhost:88 + } +[domain_realm] + localhost = APACHE.ORG