From 7ae94c6678964230728f0f872548b953827f865f Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 8 Feb 2012 01:56:02 +0000 Subject: [PATCH] MAPREDUCE-3834. Changed MR AM to not add the same rack entry multiple times into the container request table when multiple hosts for a split happen to be on the same rack. Contributed by Siddarth Seth. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1241734 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 +++ .../v2/app/job/impl/TaskAttemptImpl.java | 10 +++--- .../v2/app/job/impl/TestTaskAttempt.java | 34 ++++++++++++++++++- 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ca7d4e5932..244e607c9e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -771,6 +771,10 @@ Release 0.23.1 - Unreleased the same FS scheme, instead of randomly using one. (Mahadev Konar via sseth) + MAPREDUCE-3834. Changed MR AM to not add the same rack entry multiple times + into the container request table when multiple hosts for a split happen to + be on the same rack. (Siddarth Seth via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 42a6f08e36..f4635e92b3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -27,9 +27,11 @@ import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -1079,14 +1081,14 @@ public void transition(TaskAttemptImpl taskAttempt, taskAttempt.attemptId, taskAttempt.resourceCapability)); } else { - int i = 0; - String[] racks = new String[taskAttempt.dataLocalHosts.length]; + Set racks = new HashSet(); for (String host : taskAttempt.dataLocalHosts) { - racks[i++] = RackResolver.resolve(host).getNetworkLocation(); + racks.add(RackResolver.resolve(host).getNetworkLocation()); } taskAttempt.eventHandler.handle(new ContainerRequestEvent( taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt - .resolveHosts(taskAttempt.dataLocalHosts), racks)); + .resolveHosts(taskAttempt.dataLocalHosts), racks + .toArray(new String[racks.size()]))); } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 68fa6ae9bf..b5a54964d2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -81,7 +81,39 @@ public void testMRAppHistoryForReduce() throws Exception { MRApp app = new FailingAttemptsMRApp(0, 1); testMRAppHistory(app); } - + + @SuppressWarnings("rawtypes") + @Test + public void testSingleRackRequest() throws Exception { + TaskAttemptImpl.RequestContainerTransition rct = + new TaskAttemptImpl.RequestContainerTransition(false); + + EventHandler eventHandler = mock(EventHandler.class); + String[] hosts = new String[3]; + hosts[0] = "host1"; + hosts[1] = "host2"; + hosts[2] = "host3"; + TaskSplitMetaInfo splitInfo = + new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l); + + TaskAttemptImpl mockTaskAttempt = + createMapTaskAttemptImplForTest(eventHandler, splitInfo); + TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class); + + rct.transition(mockTaskAttempt, mockTAEvent); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(2)).handle(arg.capture()); + if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) { + Assert.fail("Second Event not of type ContainerRequestEvent"); + } + ContainerRequestEvent cre = + (ContainerRequestEvent) arg.getAllValues().get(1); + String[] requestedRacks = cre.getRacks(); + //Only a single occurance of /DefaultRack + assertEquals(1, requestedRacks.length); + } + @SuppressWarnings("rawtypes") @Test public void testHostResolveAttempt() throws Exception {