MAPREDUCE-5334. Fix failing unit tests - TestContainerLauncher, TestContainerLauncherImpl. Contributed by Vinod Kumar Vavilapalli.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1494820 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
be3a8b6636
commit
84862b54aa
@ -552,6 +552,9 @@ Release 2.1.0-beta - UNRELEASED
|
||||
MAPREDUCE-4019. -list-attempt-ids is not working (Ashwin Shankar,
|
||||
Devaraj K, and B Anil Kumar via jlowe)
|
||||
|
||||
MAPREDUCE-5334. Fix failing unit tests - TestContainerLauncher,
|
||||
TestContainerLauncherImpl. (Vinod Kumar Vavilapalli via sseth)
|
||||
|
||||
BREAKDOWN OF HADOOP-8562 SUBTASKS
|
||||
|
||||
MAPREDUCE-4739. Some MapReduce tests fail to find winutils.
|
||||
|
@ -41,12 +41,8 @@
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
@ -55,9 +51,6 @@
|
||||
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
|
||||
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
@ -393,8 +386,9 @@ public void handle(ContainerLauncherEvent event) {
|
||||
}
|
||||
}
|
||||
|
||||
public ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData getCMProxy(
|
||||
String containerMgrBindAddr, ContainerId containerId) throws IOException {
|
||||
public ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData
|
||||
getCMProxy(String containerMgrBindAddr, ContainerId containerId)
|
||||
throws IOException {
|
||||
return cmProxy.getProxy(containerMgrBindAddr, containerId);
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@ -32,6 +33,7 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
@ -61,6 +63,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
|
||||
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
|
||||
@ -70,7 +73,9 @@
|
||||
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestContainerLauncher {
|
||||
@ -82,7 +87,7 @@ public class TestContainerLauncher {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(TestContainerLauncher.class);
|
||||
|
||||
@Test
|
||||
@Test (timeout = 5000)
|
||||
public void testPoolSize() throws InterruptedException {
|
||||
|
||||
ApplicationId appId = ApplicationId.newInstance(12345, 67);
|
||||
@ -158,7 +163,7 @@ public void testPoolSize() throws InterruptedException {
|
||||
containerLauncher.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 5000)
|
||||
public void testPoolLimits() throws InterruptedException {
|
||||
ApplicationId appId = ApplicationId.newInstance(12345, 67);
|
||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
||||
@ -222,7 +227,7 @@ private void waitForEvents(CustomContainerLauncher containerLauncher,
|
||||
containerLauncher.numEventsProcessing.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 15000)
|
||||
public void testSlowNM() throws Exception {
|
||||
|
||||
conf = new Configuration();
|
||||
@ -235,11 +240,19 @@ public void testSlowNM() throws Exception {
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
String bindAddr = "localhost:0";
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
|
||||
server = rpc.getServer(ContainerManagementProtocol.class, new DummyContainerManager(),
|
||||
addr, conf, null, 1);
|
||||
NMTokenSecretManagerInNM tokenSecretManager =
|
||||
new NMTokenSecretManagerInNM();
|
||||
MasterKey masterKey = Records.newRecord(MasterKey.class);
|
||||
masterKey.setBytes(ByteBuffer.wrap("key".getBytes()));
|
||||
tokenSecretManager.setMasterKey(masterKey);
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"token");
|
||||
server =
|
||||
rpc.getServer(ContainerManagementProtocol.class,
|
||||
new DummyContainerManager(), addr, conf, tokenSecretManager, 1);
|
||||
server.start();
|
||||
|
||||
MRApp app = new MRAppWithSlowNM();
|
||||
MRApp app = new MRAppWithSlowNM(tokenSecretManager);
|
||||
|
||||
try {
|
||||
Job job = app.submit(conf);
|
||||
@ -340,8 +353,10 @@ protected ContainerLauncherImpl.EventProcessor createEventProcessor(
|
||||
|
||||
private class MRAppWithSlowNM extends MRApp {
|
||||
|
||||
public MRAppWithSlowNM() {
|
||||
private NMTokenSecretManagerInNM tokenSecretManager;
|
||||
public MRAppWithSlowNM(NMTokenSecretManagerInNM tokenSecretManager) {
|
||||
super(1, 0, false, "TestContainerLauncher", true);
|
||||
this.tokenSecretManager = tokenSecretManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -353,18 +368,19 @@ public MRAppWithSlowNM() {
|
||||
public ContainerManagementProtocolProxyData getCMProxy(
|
||||
String containerMgrBindAddr, ContainerId containerId)
|
||||
throws IOException {
|
||||
Token dummyToken =
|
||||
Token.newInstance("NMTokenIdentifier".getBytes(),
|
||||
NMTokenIdentifier.KIND.toString(), "password".getBytes(),
|
||||
"NMToken");
|
||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||
String containerManagerBindAddr =
|
||||
addr.getHostName() + ":" + addr.getPort();
|
||||
Token token =
|
||||
tokenSecretManager.createNMToken(
|
||||
containerId.getApplicationAttemptId(),
|
||||
NodeId.newInstance(addr.getHostName(), addr.getPort()), "user");
|
||||
ContainerManagementProtocolProxy cmProxy =
|
||||
new ContainerManagementProtocolProxy(conf, context.getNMTokens());
|
||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||
ContainerManagementProtocolProxyData proxy =
|
||||
cmProxy.new ContainerManagementProtocolProxyData(
|
||||
YarnRPC.create(conf),
|
||||
addr.getHostName() + ":" + addr.getPort(), containerId,
|
||||
dummyToken);
|
||||
YarnRPC.create(conf), containerManagerBindAddr, containerId,
|
||||
token);
|
||||
return proxy;
|
||||
}
|
||||
};
|
||||
|
@ -18,7 +18,6 @@
|
||||
package org.apache.hadoop.mapreduce.v2.app.launcher;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.atLeast;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
@ -26,7 +25,6 @@
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@ -59,12 +57,12 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -88,13 +86,25 @@ public void setup() throws IOException {
|
||||
private static class ContainerLauncherImplUnderTest extends
|
||||
ContainerLauncherImpl {
|
||||
|
||||
private YarnRPC rpc;
|
||||
|
||||
public ContainerLauncherImplUnderTest(AppContext context, YarnRPC rpc) {
|
||||
private ContainerManagementProtocol containerManager;
|
||||
|
||||
public ContainerLauncherImplUnderTest(AppContext context,
|
||||
ContainerManagementProtocol containerManager) {
|
||||
super(context);
|
||||
this.rpc = rpc;
|
||||
this.containerManager = containerManager;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ContainerManagementProtocolProxyData getCMProxy(
|
||||
String containerMgrBindAddr, ContainerId containerId)
|
||||
throws IOException {
|
||||
ContainerManagementProtocolProxyData protocolProxy =
|
||||
mock(ContainerManagementProtocolProxyData.class);
|
||||
when(protocolProxy.getContainerManagementProtocol()).thenReturn(
|
||||
containerManager);
|
||||
return protocolProxy;
|
||||
}
|
||||
|
||||
public void waitForPoolToIdle() throws InterruptedException {
|
||||
//I wish that we did not need the sleep, but it is here so that we are sure
|
||||
// That the other thread had time to insert the event into the queue and
|
||||
@ -133,22 +143,18 @@ public static TaskAttemptId makeTaskAttemptId(long ts, int appId, int taskId,
|
||||
return MRBuilderUtils.newTaskAttemptId(tID, id);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 5000)
|
||||
public void testHandle() throws Exception {
|
||||
LOG.info("STARTING testHandle");
|
||||
YarnRPC mockRpc = mock(YarnRPC.class);
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
@SuppressWarnings("rawtypes")
|
||||
EventHandler mockEventHandler = mock(EventHandler.class);
|
||||
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
|
||||
|
||||
ContainerManagementProtocol mockCM = mock(ContainerManagementProtocol.class);
|
||||
when(mockRpc.getProxy(eq(ContainerManagementProtocol.class),
|
||||
any(InetSocketAddress.class), any(Configuration.class)))
|
||||
.thenReturn(mockCM);
|
||||
|
||||
ContainerLauncherImplUnderTest ut =
|
||||
new ContainerLauncherImplUnderTest(mockContext, mockRpc);
|
||||
String cmAddress = "127.0.0.1:8000";
|
||||
ContainerManagementProtocol mockCM =
|
||||
mock(ContainerManagementProtocol.class);
|
||||
ContainerLauncherImplUnderTest ut =
|
||||
new ContainerLauncherImplUnderTest(mockContext, mockCM);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
ut.init(conf);
|
||||
@ -156,7 +162,6 @@ public void testHandle() throws Exception {
|
||||
try {
|
||||
ContainerId contId = makeContainerId(0l, 0, 0, 1);
|
||||
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
|
||||
String cmAddress = "127.0.0.1:8000";
|
||||
StartContainerResponse startResp =
|
||||
recordFactory.newRecordInstance(StartContainerResponse.class);
|
||||
startResp.setAllServicesMetaData(serviceResponse);
|
||||
@ -199,22 +204,18 @@ public void testHandle() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 5000)
|
||||
public void testOutOfOrder() throws Exception {
|
||||
LOG.info("STARTING testOutOfOrder");
|
||||
YarnRPC mockRpc = mock(YarnRPC.class);
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
@SuppressWarnings("rawtypes")
|
||||
EventHandler mockEventHandler = mock(EventHandler.class);
|
||||
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
|
||||
|
||||
ContainerManagementProtocol mockCM = mock(ContainerManagementProtocol.class);
|
||||
when(mockRpc.getProxy(eq(ContainerManagementProtocol.class),
|
||||
any(InetSocketAddress.class), any(Configuration.class)))
|
||||
.thenReturn(mockCM);
|
||||
|
||||
ContainerLauncherImplUnderTest ut =
|
||||
new ContainerLauncherImplUnderTest(mockContext, mockRpc);
|
||||
ContainerManagementProtocol mockCM =
|
||||
mock(ContainerManagementProtocol.class);
|
||||
ContainerLauncherImplUnderTest ut =
|
||||
new ContainerLauncherImplUnderTest(mockContext, mockCM);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
ut.init(conf);
|
||||
@ -264,23 +265,19 @@ public void testOutOfOrder() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 5000)
|
||||
public void testMyShutdown() throws Exception {
|
||||
LOG.info("in test Shutdown");
|
||||
|
||||
YarnRPC mockRpc = mock(YarnRPC.class);
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
@SuppressWarnings("rawtypes")
|
||||
EventHandler mockEventHandler = mock(EventHandler.class);
|
||||
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
|
||||
|
||||
ContainerManagementProtocol mockCM = mock(ContainerManagementProtocol.class);
|
||||
when(mockRpc.getProxy(eq(ContainerManagementProtocol.class),
|
||||
any(InetSocketAddress.class), any(Configuration.class)))
|
||||
.thenReturn(mockCM);
|
||||
|
||||
ContainerManagementProtocol mockCM =
|
||||
mock(ContainerManagementProtocol.class);
|
||||
ContainerLauncherImplUnderTest ut =
|
||||
new ContainerLauncherImplUnderTest(mockContext, mockRpc);
|
||||
new ContainerLauncherImplUnderTest(mockContext, mockCM);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
ut.init(conf);
|
||||
@ -320,26 +317,22 @@ public void testMyShutdown() throws Exception {
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Test
|
||||
@Test(timeout = 5000)
|
||||
public void testContainerCleaned() throws Exception {
|
||||
LOG.info("STARTING testContainerCleaned");
|
||||
|
||||
CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
|
||||
CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);
|
||||
|
||||
YarnRPC mockRpc = mock(YarnRPC.class);
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
|
||||
EventHandler mockEventHandler = mock(EventHandler.class);
|
||||
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
|
||||
|
||||
ContainerManagementProtocol mockCM = new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
|
||||
when(mockRpc.getProxy(eq(ContainerManagementProtocol.class),
|
||||
any(InetSocketAddress.class), any(Configuration.class)))
|
||||
.thenReturn(mockCM);
|
||||
|
||||
ContainerLauncherImplUnderTest ut =
|
||||
new ContainerLauncherImplUnderTest(mockContext, mockRpc);
|
||||
ContainerManagementProtocol mockCM =
|
||||
new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
|
||||
ContainerLauncherImplUnderTest ut =
|
||||
new ContainerLauncherImplUnderTest(mockContext, mockCM);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
ut.init(conf);
|
||||
|
Loading…
Reference in New Issue
Block a user