From f74e44635596276f35b7127f99bc5ab96ab534ed Mon Sep 17 00:00:00 2001 From: Devarajulu K Date: Mon, 19 May 2014 06:42:29 +0000 Subject: [PATCH] MAPREDUCE-5867. Fix NPE in KillAMPreemptionPolicy related to ProportionalCapacityPreemptionPolicy. Contributed by Sunil G. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1595754 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../rm/preemption/KillAMPreemptionPolicy.java | 19 ++- .../v2/app/TestKillAMPreemptionPolicy.java | 144 ++++++++++++++++++ 3 files changed, 160 insertions(+), 6 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKillAMPreemptionPolicy.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c9661812ff..c2a4f14514 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -139,6 +139,9 @@ Trunk (Unreleased) MAPREDUCE-5717. Task pings are interpreted as task progress (jlowe) + MAPREDUCE-5867. Fix NPE in KillAMPreemptionPolicy related to + ProportionalCapacityPreemptionPolicy (Sunil G via devaraj) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java index daf737a154..09237aaa29 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java @@ -29,7 +29,9 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.PreemptionContainer; +import org.apache.hadoop.yarn.api.records.PreemptionContract; import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.event.EventHandler; /** @@ -52,13 +54,18 @@ public void init(AppContext context) { public void preempt(Context ctxt, PreemptionMessage preemptionRequests) { // for both strict and negotiable preemption requests kill the // container - for (PreemptionContainer c : - preemptionRequests.getStrictContract().getContainers()) { - killContainer(ctxt, c); + StrictPreemptionContract strictContract = preemptionRequests + .getStrictContract(); + if (strictContract != null) { + for (PreemptionContainer c : strictContract.getContainers()) { + killContainer(ctxt, c); + } } - for (PreemptionContainer c : - preemptionRequests.getContract().getContainers()) { - killContainer(ctxt, c); + PreemptionContract contract = preemptionRequests.getContract(); + if (contract != null) { + for (PreemptionContainer c : contract.getContainers()) { + killContainer(ctxt, c); + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKillAMPreemptionPolicy.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKillAMPreemptionPolicy.java new file mode 100644 index 0000000000..fa930ae126 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKillAMPreemptionPolicy.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2.app; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.KillAMPreemptionPolicy; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.PreemptionContainer; +import org.apache.hadoop.yarn.api.records.PreemptionContract; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.junit.Test; + +public class TestKillAMPreemptionPolicy { + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + @SuppressWarnings("unchecked") + @Test + public void testKillAMPreemptPolicy() { + + ApplicationId appId = ApplicationId.newInstance(123456789, 1); + ContainerId container = ContainerId.newInstance( + ApplicationAttemptId.newInstance(appId, 1), 1); + AMPreemptionPolicy.Context mPctxt = mock(AMPreemptionPolicy.Context.class); + when(mPctxt.getTaskAttempt(any(ContainerId.class))).thenReturn( + MRBuilderUtils.newTaskAttemptId(MRBuilderUtils.newTaskId( + MRBuilderUtils.newJobId(appId, 1), 1, TaskType.MAP), 0)); + List p = new ArrayList(); + p.add(Container.newInstance(container, null, null, null, null, null)); + when(mPctxt.getContainers(any(TaskType.class))).thenReturn(p); + + KillAMPreemptionPolicy policy = new KillAMPreemptionPolicy(); + + // strictContract is null & contract is null + RunningAppContext mActxt = getRunningAppContext(); + policy.init(mActxt); + PreemptionMessage pM = getPreemptionMessage(false, false, container); + policy.preempt(mPctxt, pM); + verify(mActxt.getEventHandler(), times(0)).handle( + any(TaskAttemptEvent.class)); + verify(mActxt.getEventHandler(), times(0)).handle( + any(JobCounterUpdateEvent.class)); + + // strictContract is not null & contract is null + mActxt = getRunningAppContext(); + policy.init(mActxt); + pM = getPreemptionMessage(true, false, container); + policy.preempt(mPctxt, pM); + verify(mActxt.getEventHandler(), times(2)).handle( + any(TaskAttemptEvent.class)); + verify(mActxt.getEventHandler(), times(2)).handle( + any(JobCounterUpdateEvent.class)); + + // strictContract is null & contract is not null + mActxt = getRunningAppContext(); + policy.init(mActxt); + pM = getPreemptionMessage(false, true, container); + policy.preempt(mPctxt, pM); + verify(mActxt.getEventHandler(), times(2)).handle( + any(TaskAttemptEvent.class)); + verify(mActxt.getEventHandler(), times(2)).handle( + any(JobCounterUpdateEvent.class)); + + // strictContract is not null & contract is not null + mActxt = getRunningAppContext(); + policy.init(mActxt); + pM = getPreemptionMessage(true, true, container); + policy.preempt(mPctxt, pM); + verify(mActxt.getEventHandler(), times(4)).handle( + any(TaskAttemptEvent.class)); + verify(mActxt.getEventHandler(), times(4)).handle( + any(JobCounterUpdateEvent.class)); + } + + private RunningAppContext getRunningAppContext() { + RunningAppContext mActxt = mock(RunningAppContext.class); + EventHandler eventHandler = mock(EventHandler.class); + when(mActxt.getEventHandler()).thenReturn(eventHandler); + return mActxt; + } + + private PreemptionMessage getPreemptionMessage(boolean strictContract, + boolean contract, final ContainerId container) { + PreemptionMessage preemptionMessage = recordFactory + .newRecordInstance(PreemptionMessage.class); + Set cntrs = new HashSet(); + PreemptionContainer preemptContainer = recordFactory + .newRecordInstance(PreemptionContainer.class); + preemptContainer.setId(container); + cntrs.add(preemptContainer); + if (strictContract) { + StrictPreemptionContract set = recordFactory + .newRecordInstance(StrictPreemptionContract.class); + set.setContainers(cntrs); + preemptionMessage.setStrictContract(set); + } + if (contract) { + PreemptionContract preemptContract = recordFactory + .newRecordInstance(PreemptionContract.class); + preemptContract.setContainers(cntrs); + preemptionMessage.setContract(preemptContract); + } + return preemptionMessage; + } + +}