diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2f806154ac..245840321e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -542,6 +542,9 @@ Release 2.7.2 - UNRELEASED BUG FIXES + MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to getMapOutputInfo + if mapId is not in the cache. (zhihai xu via devaraj) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index eedf42b3a7..ee1be23407 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -815,7 +815,8 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) try { MapOutputInfo info = mapOutputInfoMap.get(mapId); if (info == null) { - info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user); + info = getMapOutputInfo(outputBasePathStr + mapId, + mapId, reduceId, user); } lastMap = sendMapOutput(ctx, ch, user, mapId, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index 7053653661..746071fb7e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -601,6 +601,7 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, Assert.assertTrue((new String(byteArr)).contains(message)); } finally { shuffleHandler.stop(); + FileUtil.fullyDelete(absLogDir); } } @@ -829,4 +830,104 @@ private static int getShuffleResponseCode(ShuffleHandler shuffle, conn.disconnect(); return rc; } + + @Test(timeout = 100000) + public void testGetMapOutputInfo() throws Exception { + final ArrayList failures = new ArrayList(1); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + File absLogDir = new File("target", TestShuffleHandler.class. + getSimpleName() + "LocDir").getAbsoluteFile(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + ApplicationId appId = ApplicationId.newInstance(12345, 1); + String appAttemptId = "attempt_12345_1_m_1_0"; + String user = "randomUser"; + String reducerId = "0"; + List fileMap = new ArrayList(); + createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + conf, fileMap); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected void populateHeaders(List mapIds, + String outputBaseStr, String user, int reduce, + HttpRequest request, HttpResponse response, + boolean keepAliveParam, Map infoMap) + throws IOException { + // Only set response headers and skip everything else + // send some dummy value for content-length + super.setResponseHeaders(response, keepAliveParam, 100); + } + @Override + protected void verifyRequest(String appid, + ChannelHandlerContext ctx, HttpRequest request, + HttpResponse response, URL requestUri) throws IOException { + // Do nothing. + } + @Override + protected void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { + if (failures.size() == 0) { + failures.add(new Error(message)); + ctx.getChannel().close(); + } + } + @Override + protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, + Channel ch, String user, String mapId, int reduce, + MapOutputInfo info) throws IOException { + // send a shuffle header + ShuffleHeader header = + new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + } + }; + } + }; + shuffleHandler.init(conf); + try { + shuffleHandler.start(); + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token jt = + new Token("identifier".getBytes(), + "password".getBytes(), new Text(user), new Text("shuffleService")); + jt.write(outputBuffer); + shuffleHandler + .initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); + URL url = + new URL( + "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_0001&reduce=" + reducerId + + "&map=attempt_12345_1_m_1_0"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + try { + DataInputStream is = new DataInputStream(conn.getInputStream()); + ShuffleHeader header = new ShuffleHeader(); + header.readFields(is); + is.close(); + } catch (EOFException e) { + // ignore + } + Assert.assertEquals(failures.size(), 0); + } finally { + shuffleHandler.stop(); + FileUtil.fullyDelete(absLogDir); + } + } }