YARN-7244. ShuffleHandler is not aware of disks that are added. Contributed by Kuhu Shukla

This commit is contained in:
Jason Lowe 2017-10-27 16:54:46 -05:00
parent 99880d0a16
commit 665bb147aa
8 changed files with 239 additions and 34 deletions

View File

@ -57,7 +57,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
@ -84,7 +83,6 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
@ -855,8 +853,6 @@ class Shuffle extends SimpleChannelUpstreamHandler {
private static final int ALLOWED_CONCURRENCY = 16;
private final Configuration conf;
private final IndexCache indexCache;
private final LocalDirAllocator lDirAlloc =
new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
private int port;
private final LoadingCache<AttemptPathIdentifier, AttemptPathInfo> pathCache =
CacheBuilder.newBuilder().expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES,
@ -889,10 +885,10 @@ public AttemptPathInfo load(AttemptPathIdentifier key) throws
Exception {
String base = getBaseLocation(key.jobId, key.user);
String attemptBase = base + key.attemptId;
Path indexFileName = lDirAlloc.getLocalPathToRead(
attemptBase + "/" + INDEX_FILE_NAME, conf);
Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
attemptBase + "/" + DATA_FILE_NAME, conf);
Path indexFileName = getAuxiliaryLocalPathHandler()
.getLocalPathForRead(attemptBase + "/" + INDEX_FILE_NAME);
Path mapOutputFileName = getAuxiliaryLocalPathHandler()
.getLocalPathForRead(attemptBase + "/" + DATA_FILE_NAME);
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded : " + key + " via loader");

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@ -26,6 +27,7 @@
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -69,12 +71,14 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.records.Version;
import org.jboss.netty.channel.Channel;
@ -99,8 +103,12 @@
public class TestShuffleHandler {
static final long MiB = 1024 * 1024;
private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class);
private static final File ABS_LOG_DIR = GenericTestUtils.getTestDir(
TestShuffleHandler.class.getSimpleName() + "LocDir");
class MockShuffleHandler extends org.apache.hadoop.mapred.ShuffleHandler {
private AuxiliaryLocalPathHandler pathHandler =
new TestAuxiliaryLocalPathHandler();
@Override
protected Shuffle getShuffle(final Configuration conf) {
return new Shuffle(conf) {
@ -140,11 +148,35 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
}
};
}
@Override
public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() {
return pathHandler;
}
}
private static class MockShuffleHandler2 extends org.apache.hadoop.mapred.ShuffleHandler {
boolean socketKeepAlive = false;
private class TestAuxiliaryLocalPathHandler
implements AuxiliaryLocalPathHandler {
@Override
public Path getLocalPathForRead(String path) throws IOException {
return new Path(ABS_LOG_DIR.getAbsolutePath(), path);
}
@Override
public Path getLocalPathForWrite(String path) throws IOException {
return new Path(ABS_LOG_DIR.getAbsolutePath());
}
@Override
public Path getLocalPathForWrite(String path, long size)
throws IOException {
return new Path(ABS_LOG_DIR.getAbsolutePath());
}
}
private static class MockShuffleHandler2 extends
org.apache.hadoop.mapred.ShuffleHandler {
boolean socketKeepAlive = false;
@Override
protected Shuffle getShuffle(final Configuration conf) {
return new Shuffle(conf) {
@ -479,6 +511,11 @@ public void testSocketKeepAlive() throws Exception {
conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
HttpURLConnection conn = null;
MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2();
AuxiliaryLocalPathHandler pathHandler =
mock(AuxiliaryLocalPathHandler.class);
when(pathHandler.getLocalPathForRead(anyString())).thenThrow(
new DiskChecker.DiskErrorException("Test"));
shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler);
try {
shuffleHandler.init(conf);
shuffleHandler.start();
@ -668,19 +705,16 @@ public void testMapFileAccess() throws IOException {
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
File absLogDir = new File("target",
TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath());
ApplicationId appId = ApplicationId.newInstance(12345, 1);
LOG.info(appId.toString());
String appAttemptId = "attempt_12345_1_m_1_0";
String user = "randomUser";
String reducerId = "0";
List<File> fileMap = new ArrayList<File>();
createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
createShuffleHandlerFiles(ABS_LOG_DIR, user, appId.toString(), appAttemptId,
conf, fileMap);
ShuffleHandler shuffleHandler = new ShuffleHandler() {
@Override
protected Shuffle getShuffle(Configuration conf) {
// replace the shuffle handler with one stubbed for testing
@ -696,6 +730,8 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
};
}
};
AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler();
shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler);
shuffleHandler.init(conf);
try {
shuffleHandler.start();
@ -740,7 +776,7 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
Assert.assertTrue((new String(byteArr)).contains(message));
} finally {
shuffleHandler.stop();
FileUtil.fullyDelete(absLogDir);
FileUtil.fullyDelete(ABS_LOG_DIR);
}
}
@ -801,10 +837,14 @@ public void testRecovery() throws IOException {
final File tmpDir = new File(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestShuffleHandler.class.getName());
ShuffleHandler shuffle = new ShuffleHandler();
AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler();
shuffle.setAuxiliaryLocalPathHandler(pathHandler);
Configuration conf = new Configuration();
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
ShuffleHandler shuffle = new ShuffleHandler();
conf.set(YarnConfiguration.NM_LOCAL_DIRS,
ABS_LOG_DIR.getAbsolutePath());
// emulate aux services startup with recovery enabled
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
tmpDir.mkdirs();
@ -830,6 +870,7 @@ public void testRecovery() throws IOException {
// emulate shuffle handler restart
shuffle.close();
shuffle = new ShuffleHandler();
shuffle.setAuxiliaryLocalPathHandler(pathHandler);
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
shuffle.init(conf);
shuffle.start();
@ -872,6 +913,9 @@ public void testRecoveryFromOtherVersions() throws IOException {
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
ShuffleHandler shuffle = new ShuffleHandler();
AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler();
shuffle.setAuxiliaryLocalPathHandler(pathHandler);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath());
// emulate aux services startup with recovery enabled
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
tmpDir.mkdirs();
@ -897,6 +941,7 @@ public void testRecoveryFromOtherVersions() throws IOException {
// emulate shuffle handler restart
shuffle.close();
shuffle = new ShuffleHandler();
shuffle.setAuxiliaryLocalPathHandler(pathHandler);
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
shuffle.init(conf);
shuffle.start();
@ -914,6 +959,7 @@ public void testRecoveryFromOtherVersions() throws IOException {
Assert.assertEquals(version11, shuffle.loadVersion());
shuffle.close();
shuffle = new ShuffleHandler();
shuffle.setAuxiliaryLocalPathHandler(pathHandler);
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
shuffle.init(conf);
shuffle.start();
@ -930,6 +976,7 @@ public void testRecoveryFromOtherVersions() throws IOException {
Assert.assertEquals(version21, shuffle.loadVersion());
shuffle.close();
shuffle = new ShuffleHandler();
shuffle.setAuxiliaryLocalPathHandler(pathHandler);
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
shuffle.init(conf);
@ -979,16 +1026,15 @@ public void testGetMapOutputInfo() throws Exception {
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"simple");
UserGroupInformation.setConfiguration(conf);
File absLogDir = new File("target", TestShuffleHandler.class.
getSimpleName() + "LocDir").getAbsoluteFile();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath());
ApplicationId appId = ApplicationId.newInstance(12345, 1);
String appAttemptId = "attempt_12345_1_m_1_0";
String user = "randomUser";
String reducerId = "0";
List<File> fileMap = new ArrayList<File>();
createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
createShuffleHandlerFiles(ABS_LOG_DIR, user, appId.toString(), appAttemptId,
conf, fileMap);
AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler();
ShuffleHandler shuffleHandler = new ShuffleHandler() {
@Override
protected Shuffle getShuffle(Configuration conf) {
@ -1032,6 +1078,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
};
}
};
shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler);
shuffleHandler.init(conf);
try {
shuffleHandler.start();
@ -1070,7 +1117,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
0, failures.size());
} finally {
shuffleHandler.stop();
FileUtil.fullyDelete(absLogDir);
FileUtil.fullyDelete(ABS_LOG_DIR);
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
/** An Interface that can retrieve local directories to read from or write to.
* Components can implement this interface to link it to
* their own Directory Handler Service
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface AuxiliaryLocalPathHandler {
/**
* Get a path from the local FS for reading for a given Auxiliary Service.
* @param path the requested path
* @return the complete path to the file on a local disk
* @throws IOException if the file read encounters a problem
*/
Path getLocalPathForRead(String path) throws IOException;
/**
* Get a path from the local FS for writing for a given Auxiliary Service.
* @param path the requested path
* @return the complete path to the file on a local disk
* @throws IOException if the path creations fails
*/
Path getLocalPathForWrite(String path) throws IOException;
/**
* Get a path from the local FS for writing a file of an estimated size
* for a given Auxiliary Service.
* @param path the requested path
* @param size the size of the file that is going to be written
* @return the complete path to the file on a local disk
* @throws IOException if the path creations fails
*/
Path getLocalPathForWrite(String path, long size) throws IOException;
}

View File

@ -40,6 +40,7 @@
public abstract class AuxiliaryService extends AbstractService {
private Path recoveryPath = null;
private AuxiliaryLocalPathHandler auxiliaryLocalPathHandler;
protected AuxiliaryService(String name) {
super(name);
@ -123,4 +124,24 @@ public void stopContainer(ContainerTerminationContext stopContainerContext) {
public void setRecoveryPath(Path recoveryPath) {
this.recoveryPath = recoveryPath;
}
/**
* Method that gets the local dirs path handler for this Auxiliary Service.
*
* @return auxiliaryPathHandler object that is used to read from and write to
* valid local Dirs.
*/
public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() {
return this.auxiliaryLocalPathHandler;
}
/**
* Method that sets the local dirs path handler for this Auxiliary Service.
*
* @param auxiliaryLocalPathHandler the pathHandler for this auxiliary service
*/
public void setAuxiliaryLocalPathHandler(
AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) {
this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler;
}
}

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
@ -56,15 +57,17 @@ public class AuxServices extends AbstractService
protected final Map<String,AuxiliaryService> serviceMap;
protected final Map<String,ByteBuffer> serviceMetaData;
private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler;
private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$");
public AuxServices() {
public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) {
super(AuxServices.class.getName());
serviceMap =
Collections.synchronizedMap(new HashMap<String,AuxiliaryService>());
serviceMetaData =
Collections.synchronizedMap(new HashMap<String,ByteBuffer>());
this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler;
// Obtain services from configuration in init()
}
@ -154,6 +157,7 @@ public void serviceInit(Configuration conf) throws Exception {
+"Service Meta Data may have issues unless the refer to "
+"the name in the config.");
}
s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler);
addService(sName, s);
if (recoveryEnabled) {
Path storePath = new Path(stateStoreRoot, sName);

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
@ -93,6 +94,7 @@
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
@ -248,8 +250,10 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
this.containerScheduler = createContainerScheduler(context);
addService(containerScheduler);
AuxiliaryLocalPathHandler auxiliaryLocalPathHandler =
new AuxiliaryLocalPathHandlerImpl(dirsHandler);
// Start configurable services
auxiliaryServices = new AuxServices();
auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler);
auxiliaryServices.registerServiceListener(this);
addService(auxiliaryServices);
@ -1523,6 +1527,35 @@ public void handle(LocalizationEvent event) {
}
}
/**
* Implements AuxiliaryLocalPathHandler.
* It links NodeManager's LocalDirsHandlerService to the Auxiliary Services
*/
static class AuxiliaryLocalPathHandlerImpl
implements AuxiliaryLocalPathHandler {
private LocalDirsHandlerService dirhandlerService;
AuxiliaryLocalPathHandlerImpl(
LocalDirsHandlerService dirhandlerService) {
this.dirhandlerService = dirhandlerService;
}
@Override
public Path getLocalPathForRead(String path) throws IOException {
return dirhandlerService.getLocalPathForRead(path);
}
@Override
public Path getLocalPathForWrite(String path) throws IOException {
return dirhandlerService.getLocalPathForWrite(path);
}
@Override
public Path getLocalPathForWrite(String path, long size)
throws IOException {
return dirhandlerService.getLocalPathForWrite(path, size, false);
}
}
@SuppressWarnings("unchecked")
@Override
public void handle(ContainerManagerEvent event) {

View File

@ -25,6 +25,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.mockito.Mockito;
import static org.mockito.Mockito.mock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,6 +64,7 @@
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
@ -79,6 +81,8 @@ public class TestAuxServices {
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestAuxServices.class.getName());
private final static AuxiliaryLocalPathHandler MOCK_AUX_PATH_HANDLER =
Mockito.mock(AuxiliaryLocalPathHandler.class);
static class LightService extends AuxiliaryService implements Service
{
@ -198,7 +202,7 @@ public void testCustomizedAuxServiceClassPath() throws Exception {
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
"ServiceC"), ServiceC.class, Service.class);
@SuppressWarnings("resource")
AuxServices aux = new AuxServices();
AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
aux.init(conf);
aux.start();
Map<String, ByteBuffer> meta = aux.getMetaData();
@ -240,7 +244,7 @@ public void testCustomizedAuxServiceClassPath() throws Exception {
conf.set(String.format(
YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES,
"ServiceC"), systemClasses);
aux = new AuxServices();
aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
aux.init(conf);
aux.start();
meta = aux.getMetaData();
@ -278,7 +282,7 @@ public void testAuxEventDispatch() {
ServiceB.class, Service.class);
conf.setInt("A.expected.init", 1);
conf.setInt("B.expected.stop", 1);
final AuxServices aux = new AuxServices();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
aux.init(conf);
aux.start();
@ -342,7 +346,7 @@ public void testAuxServices() {
ServiceA.class, Service.class);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
ServiceB.class, Service.class);
final AuxServices aux = new AuxServices();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
aux.init(conf);
int latch = 1;
@ -354,8 +358,10 @@ public void testAuxServices() {
}
assertEquals("Invalid mix of services", 6, latch);
aux.start();
for (Service s : aux.getServices()) {
for (AuxiliaryService s : aux.getServices()) {
assertEquals(STARTED, s.getServiceState());
assertEquals(s.getAuxiliaryLocalPathHandler(),
MOCK_AUX_PATH_HANDLER);
}
aux.stop();
@ -373,7 +379,7 @@ public void testAuxServicesMeta() {
ServiceA.class, Service.class);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
ServiceB.class, Service.class);
final AuxServices aux = new AuxServices();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
aux.init(conf);
int latch = 1;
@ -410,7 +416,7 @@ public void testAuxUnexpectedStop() {
ServiceA.class, Service.class);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
ServiceB.class, Service.class);
final AuxServices aux = new AuxServices();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
aux.init(conf);
aux.start();
@ -423,7 +429,7 @@ public void testAuxUnexpectedStop() {
@Test
public void testValidAuxServiceName() {
final AuxServices aux = new AuxServices();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
Configuration conf = new Configuration();
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"Asrv1", "Bsrv_2"});
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv1"),
@ -437,7 +443,7 @@ public void testValidAuxServiceName() {
}
//Test bad auxService Name
final AuxServices aux1 = new AuxServices();
final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER);
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"1Asrv1"});
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "1Asrv1"),
ServiceA.class, Service.class);
@ -463,7 +469,7 @@ public void testAuxServiceRecoverySetup() throws IOException {
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
RecoverableServiceB.class, Service.class);
try {
final AuxServices aux = new AuxServices();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
aux.init(conf);
Assert.assertEquals(2, aux.getServices().size());
File auxStorageDir = new File(TEST_DIR,

View File

@ -18,7 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
@ -111,6 +115,7 @@
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.mockito.Mockito.when;
import org.slf4j.LoggerFactory;
public class TestContainerManager extends BaseContainerManagerTest {
@ -313,6 +318,41 @@ public void testContainerSetup() throws Exception {
Assert.assertEquals(null, reader.readLine());
}
@Test (timeout = 10000L)
public void testAuxPathHandler() throws Exception {
File testDir = GenericTestUtils.getTestDir(GenericTestUtils.getTestDir(
TestContainerManager.class.getSimpleName() + "LocDir").
getAbsolutePath());
testDir.mkdirs();
File testFile = new File(testDir, "test");
testFile.createNewFile();
YarnConfiguration configuration = new YarnConfiguration();
configuration.set(YarnConfiguration.NM_LOCAL_DIRS,
testDir.getAbsolutePath());
LocalDirsHandlerService spyDirHandlerService =
Mockito.spy(new LocalDirsHandlerService());
spyDirHandlerService.init(configuration);
when(spyDirHandlerService.getConfig()).thenReturn(configuration);
AuxiliaryLocalPathHandler auxiliaryLocalPathHandler =
new ContainerManagerImpl.AuxiliaryLocalPathHandlerImpl(
spyDirHandlerService);
Path p = auxiliaryLocalPathHandler.getLocalPathForRead("test");
assertTrue(p != null &&
!spyDirHandlerService.getLocalDirsForRead().isEmpty());
when(spyDirHandlerService.getLocalDirsForRead()).thenReturn(
new ArrayList<String>());
try {
auxiliaryLocalPathHandler.getLocalPathForRead("test");
fail("Should not have passed!");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains("Could not find"));
} finally {
testFile.delete();
testDir.delete();
}
}
//@Test
public void testContainerLaunchAndStop() throws IOException,
InterruptedException, YarnException {