diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 0eeae19ab8..bf8241d52e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -57,7 +57,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -84,7 +83,6 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; @@ -855,8 +853,6 @@ class Shuffle extends SimpleChannelUpstreamHandler { private static final int ALLOWED_CONCURRENCY = 16; private final Configuration conf; private final IndexCache indexCache; - private final LocalDirAllocator lDirAlloc = - new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); private int port; private final LoadingCache pathCache = CacheBuilder.newBuilder().expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES, @@ -889,10 +885,10 @@ public AttemptPathInfo load(AttemptPathIdentifier key) throws Exception { String base = getBaseLocation(key.jobId, key.user); String attemptBase = base + key.attemptId; - Path indexFileName = lDirAlloc.getLocalPathToRead( - attemptBase + "/" + INDEX_FILE_NAME, conf); - Path mapOutputFileName = lDirAlloc.getLocalPathToRead( - attemptBase + "/" + DATA_FILE_NAME, conf); + Path indexFileName = getAuxiliaryLocalPathHandler() + .getLocalPathForRead(attemptBase + "/" + INDEX_FILE_NAME); + Path mapOutputFileName = getAuxiliaryLocalPathHandler() + .getLocalPathForRead(attemptBase + "/" + DATA_FILE_NAME); if (LOG.isDebugEnabled()) { LOG.debug("Loaded : " + key + " via loader"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index 849ce1a156..250dcf65d1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred; +import org.apache.hadoop.test.GenericTestUtils; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertGauge; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; @@ -26,6 +27,7 @@ import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; import static org.junit.Assert.assertEquals; import static org.junit.Assume.assumeTrue; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -69,12 +71,14 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.PureJavaCrc32; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.records.Version; import org.jboss.netty.channel.Channel; @@ -99,8 +103,12 @@ public class TestShuffleHandler { static final long MiB = 1024 * 1024; private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class); + private static final File ABS_LOG_DIR = GenericTestUtils.getTestDir( + TestShuffleHandler.class.getSimpleName() + "LocDir"); class MockShuffleHandler extends org.apache.hadoop.mapred.ShuffleHandler { + private AuxiliaryLocalPathHandler pathHandler = + new TestAuxiliaryLocalPathHandler(); @Override protected Shuffle getShuffle(final Configuration conf) { return new Shuffle(conf) { @@ -140,11 +148,35 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, } }; } + + @Override + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return pathHandler; + } } - private static class MockShuffleHandler2 extends org.apache.hadoop.mapred.ShuffleHandler { - boolean socketKeepAlive = false; + private class TestAuxiliaryLocalPathHandler + implements AuxiliaryLocalPathHandler { + @Override + public Path getLocalPathForRead(String path) throws IOException { + return new Path(ABS_LOG_DIR.getAbsolutePath(), path); + } + @Override + public Path getLocalPathForWrite(String path) throws IOException { + return new Path(ABS_LOG_DIR.getAbsolutePath()); + } + + @Override + public Path getLocalPathForWrite(String path, long size) + throws IOException { + return new Path(ABS_LOG_DIR.getAbsolutePath()); + } + } + + private static class MockShuffleHandler2 extends + org.apache.hadoop.mapred.ShuffleHandler { + boolean socketKeepAlive = false; @Override protected Shuffle getShuffle(final Configuration conf) { return new Shuffle(conf) { @@ -479,6 +511,11 @@ public void testSocketKeepAlive() throws Exception { conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100); HttpURLConnection conn = null; MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2(); + AuxiliaryLocalPathHandler pathHandler = + mock(AuxiliaryLocalPathHandler.class); + when(pathHandler.getLocalPathForRead(anyString())).thenThrow( + new DiskChecker.DiskErrorException("Test")); + shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler); try { shuffleHandler.init(conf); shuffleHandler.start(); @@ -668,19 +705,16 @@ public void testMapFileAccess() throws IOException { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); - File absLogDir = new File("target", - TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); - conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath()); ApplicationId appId = ApplicationId.newInstance(12345, 1); LOG.info(appId.toString()); String appAttemptId = "attempt_12345_1_m_1_0"; String user = "randomUser"; String reducerId = "0"; List fileMap = new ArrayList(); - createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + createShuffleHandlerFiles(ABS_LOG_DIR, user, appId.toString(), appAttemptId, conf, fileMap); ShuffleHandler shuffleHandler = new ShuffleHandler() { - @Override protected Shuffle getShuffle(Configuration conf) { // replace the shuffle handler with one stubbed for testing @@ -696,6 +730,8 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, }; } }; + AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); + shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler); shuffleHandler.init(conf); try { shuffleHandler.start(); @@ -740,7 +776,7 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, Assert.assertTrue((new String(byteArr)).contains(message)); } finally { shuffleHandler.stop(); - FileUtil.fullyDelete(absLogDir); + FileUtil.fullyDelete(ABS_LOG_DIR); } } @@ -801,10 +837,14 @@ public void testRecovery() throws IOException { final File tmpDir = new File(System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), TestShuffleHandler.class.getName()); + ShuffleHandler shuffle = new ShuffleHandler(); + AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); + shuffle.setAuxiliaryLocalPathHandler(pathHandler); Configuration conf = new Configuration(); conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); - ShuffleHandler shuffle = new ShuffleHandler(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, + ABS_LOG_DIR.getAbsolutePath()); // emulate aux services startup with recovery enabled shuffle.setRecoveryPath(new Path(tmpDir.toString())); tmpDir.mkdirs(); @@ -830,6 +870,7 @@ public void testRecovery() throws IOException { // emulate shuffle handler restart shuffle.close(); shuffle = new ShuffleHandler(); + shuffle.setAuxiliaryLocalPathHandler(pathHandler); shuffle.setRecoveryPath(new Path(tmpDir.toString())); shuffle.init(conf); shuffle.start(); @@ -872,6 +913,9 @@ public void testRecoveryFromOtherVersions() throws IOException { conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); ShuffleHandler shuffle = new ShuffleHandler(); + AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); + shuffle.setAuxiliaryLocalPathHandler(pathHandler); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath()); // emulate aux services startup with recovery enabled shuffle.setRecoveryPath(new Path(tmpDir.toString())); tmpDir.mkdirs(); @@ -897,6 +941,7 @@ public void testRecoveryFromOtherVersions() throws IOException { // emulate shuffle handler restart shuffle.close(); shuffle = new ShuffleHandler(); + shuffle.setAuxiliaryLocalPathHandler(pathHandler); shuffle.setRecoveryPath(new Path(tmpDir.toString())); shuffle.init(conf); shuffle.start(); @@ -914,6 +959,7 @@ public void testRecoveryFromOtherVersions() throws IOException { Assert.assertEquals(version11, shuffle.loadVersion()); shuffle.close(); shuffle = new ShuffleHandler(); + shuffle.setAuxiliaryLocalPathHandler(pathHandler); shuffle.setRecoveryPath(new Path(tmpDir.toString())); shuffle.init(conf); shuffle.start(); @@ -930,6 +976,7 @@ public void testRecoveryFromOtherVersions() throws IOException { Assert.assertEquals(version21, shuffle.loadVersion()); shuffle.close(); shuffle = new ShuffleHandler(); + shuffle.setAuxiliaryLocalPathHandler(pathHandler); shuffle.setRecoveryPath(new Path(tmpDir.toString())); shuffle.init(conf); @@ -979,16 +1026,15 @@ public void testGetMapOutputInfo() throws Exception { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); UserGroupInformation.setConfiguration(conf); - File absLogDir = new File("target", TestShuffleHandler.class. - getSimpleName() + "LocDir").getAbsoluteFile(); - conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath()); ApplicationId appId = ApplicationId.newInstance(12345, 1); String appAttemptId = "attempt_12345_1_m_1_0"; String user = "randomUser"; String reducerId = "0"; List fileMap = new ArrayList(); - createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + createShuffleHandlerFiles(ABS_LOG_DIR, user, appId.toString(), appAttemptId, conf, fileMap); + AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); ShuffleHandler shuffleHandler = new ShuffleHandler() { @Override protected Shuffle getShuffle(Configuration conf) { @@ -1032,6 +1078,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, }; } }; + shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler); shuffleHandler.init(conf); try { shuffleHandler.start(); @@ -1070,7 +1117,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, 0, failures.size()); } finally { shuffleHandler.stop(); - FileUtil.fullyDelete(absLogDir); + FileUtil.fullyDelete(ABS_LOG_DIR); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryLocalPathHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryLocalPathHandler.java new file mode 100644 index 0000000000..50feecf453 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryLocalPathHandler.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +/** An Interface that can retrieve local directories to read from or write to. + * Components can implement this interface to link it to + * their own Directory Handler Service + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface AuxiliaryLocalPathHandler { + /** + * Get a path from the local FS for reading for a given Auxiliary Service. + * @param path the requested path + * @return the complete path to the file on a local disk + * @throws IOException if the file read encounters a problem + */ + Path getLocalPathForRead(String path) throws IOException; + + /** + * Get a path from the local FS for writing for a given Auxiliary Service. + * @param path the requested path + * @return the complete path to the file on a local disk + * @throws IOException if the path creations fails + */ + Path getLocalPathForWrite(String path) throws IOException; + + /** + * Get a path from the local FS for writing a file of an estimated size + * for a given Auxiliary Service. + * @param path the requested path + * @param size the size of the file that is going to be written + * @return the complete path to the file on a local disk + * @throws IOException if the path creations fails + */ + Path getLocalPathForWrite(String path, long size) throws IOException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java index 58b1d4a61a..79f2ede39f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java @@ -40,6 +40,7 @@ public abstract class AuxiliaryService extends AbstractService { private Path recoveryPath = null; + private AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; protected AuxiliaryService(String name) { super(name); @@ -123,4 +124,24 @@ public void stopContainer(ContainerTerminationContext stopContainerContext) { public void setRecoveryPath(Path recoveryPath) { this.recoveryPath = recoveryPath; } + + /** + * Method that gets the local dirs path handler for this Auxiliary Service. + * + * @return auxiliaryPathHandler object that is used to read from and write to + * valid local Dirs. + */ + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return this.auxiliaryLocalPathHandler; + } + + /** + * Method that sets the local dirs path handler for this Auxiliary Service. + * + * @param auxiliaryLocalPathHandler the pathHandler for this auxiliary service + */ + public void setAuxiliaryLocalPathHandler( + AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) { + this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index 5e0f2936f8..57cca50ca3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; @@ -56,15 +57,17 @@ public class AuxServices extends AbstractService protected final Map serviceMap; protected final Map serviceMetaData; + private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$"); - public AuxServices() { + public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) { super(AuxServices.class.getName()); serviceMap = Collections.synchronizedMap(new HashMap()); serviceMetaData = Collections.synchronizedMap(new HashMap()); + this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler; // Obtain services from configuration in init() } @@ -154,6 +157,7 @@ public void serviceInit(Configuration conf) throws Exception { +"Service Meta Data may have issues unless the refer to " +"the name in the config."); } + s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler); addService(sName, s); if (recoveryEnabled) { Path storePath = new Path(stateStoreRoot, sName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 7d5525a889..55119e0432 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; @@ -93,6 +94,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; @@ -248,8 +250,10 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, this.containerScheduler = createContainerScheduler(context); addService(containerScheduler); + AuxiliaryLocalPathHandler auxiliaryLocalPathHandler = + new AuxiliaryLocalPathHandlerImpl(dirsHandler); // Start configurable services - auxiliaryServices = new AuxServices(); + auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler); auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); @@ -1523,6 +1527,35 @@ public void handle(LocalizationEvent event) { } } + /** + * Implements AuxiliaryLocalPathHandler. + * It links NodeManager's LocalDirsHandlerService to the Auxiliary Services + */ + static class AuxiliaryLocalPathHandlerImpl + implements AuxiliaryLocalPathHandler { + private LocalDirsHandlerService dirhandlerService; + AuxiliaryLocalPathHandlerImpl( + LocalDirsHandlerService dirhandlerService) { + this.dirhandlerService = dirhandlerService; + } + + @Override + public Path getLocalPathForRead(String path) throws IOException { + return dirhandlerService.getLocalPathForRead(path); + } + + @Override + public Path getLocalPathForWrite(String path) throws IOException { + return dirhandlerService.getLocalPathForWrite(path); + } + + @Override + public Path getLocalPathForWrite(String path, long size) + throws IOException { + return dirhandlerService.getLocalPathForWrite(path, size, false); + } + } + @SuppressWarnings("unchecked") @Override public void handle(ContainerManagerEvent event) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index bfe87c690d..6d12f8c3ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import org.mockito.Mockito; import static org.mockito.Mockito.mock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +64,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; @@ -79,6 +81,8 @@ public class TestAuxServices { System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), TestAuxServices.class.getName()); + private final static AuxiliaryLocalPathHandler MOCK_AUX_PATH_HANDLER = + Mockito.mock(AuxiliaryLocalPathHandler.class); static class LightService extends AuxiliaryService implements Service { @@ -198,7 +202,7 @@ public void testCustomizedAuxServiceClassPath() throws Exception { conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "ServiceC"), ServiceC.class, Service.class); @SuppressWarnings("resource") - AuxServices aux = new AuxServices(); + AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); aux.init(conf); aux.start(); Map meta = aux.getMetaData(); @@ -240,7 +244,7 @@ public void testCustomizedAuxServiceClassPath() throws Exception { conf.set(String.format( YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES, "ServiceC"), systemClasses); - aux = new AuxServices(); + aux = new AuxServices(MOCK_AUX_PATH_HANDLER); aux.init(conf); aux.start(); meta = aux.getMetaData(); @@ -278,7 +282,7 @@ public void testAuxEventDispatch() { ServiceB.class, Service.class); conf.setInt("A.expected.init", 1); conf.setInt("B.expected.stop", 1); - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); aux.init(conf); aux.start(); @@ -342,7 +346,7 @@ public void testAuxServices() { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); aux.init(conf); int latch = 1; @@ -354,8 +358,10 @@ public void testAuxServices() { } assertEquals("Invalid mix of services", 6, latch); aux.start(); - for (Service s : aux.getServices()) { + for (AuxiliaryService s : aux.getServices()) { assertEquals(STARTED, s.getServiceState()); + assertEquals(s.getAuxiliaryLocalPathHandler(), + MOCK_AUX_PATH_HANDLER); } aux.stop(); @@ -373,7 +379,7 @@ public void testAuxServicesMeta() { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); aux.init(conf); int latch = 1; @@ -410,7 +416,7 @@ public void testAuxUnexpectedStop() { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); aux.init(conf); aux.start(); @@ -423,7 +429,7 @@ public void testAuxUnexpectedStop() { @Test public void testValidAuxServiceName() { - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); Configuration conf = new Configuration(); conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"Asrv1", "Bsrv_2"}); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv1"), @@ -437,7 +443,7 @@ public void testValidAuxServiceName() { } //Test bad auxService Name - final AuxServices aux1 = new AuxServices(); + final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER); conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"1Asrv1"}); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "1Asrv1"), ServiceA.class, Service.class); @@ -463,7 +469,7 @@ public void testAuxServiceRecoverySetup() throws IOException { conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), RecoverableServiceB.class, Service.class); try { - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); aux.init(conf); Assert.assertEquals(2, aux.getServices().size()); File auxStorageDir = new File(TEST_DIR, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 6e8c005d62..6d198a444b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -18,7 +18,11 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; @@ -111,6 +115,7 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import static org.mockito.Mockito.when; import org.slf4j.LoggerFactory; public class TestContainerManager extends BaseContainerManagerTest { @@ -313,6 +318,41 @@ public void testContainerSetup() throws Exception { Assert.assertEquals(null, reader.readLine()); } + @Test (timeout = 10000L) + public void testAuxPathHandler() throws Exception { + File testDir = GenericTestUtils.getTestDir(GenericTestUtils.getTestDir( + TestContainerManager.class.getSimpleName() + "LocDir"). + getAbsolutePath()); + testDir.mkdirs(); + File testFile = new File(testDir, "test"); + testFile.createNewFile(); + YarnConfiguration configuration = new YarnConfiguration(); + configuration.set(YarnConfiguration.NM_LOCAL_DIRS, + testDir.getAbsolutePath()); + LocalDirsHandlerService spyDirHandlerService = + Mockito.spy(new LocalDirsHandlerService()); + spyDirHandlerService.init(configuration); + when(spyDirHandlerService.getConfig()).thenReturn(configuration); + AuxiliaryLocalPathHandler auxiliaryLocalPathHandler = + new ContainerManagerImpl.AuxiliaryLocalPathHandlerImpl( + spyDirHandlerService); + Path p = auxiliaryLocalPathHandler.getLocalPathForRead("test"); + assertTrue(p != null && + !spyDirHandlerService.getLocalDirsForRead().isEmpty()); + + when(spyDirHandlerService.getLocalDirsForRead()).thenReturn( + new ArrayList()); + try { + auxiliaryLocalPathHandler.getLocalPathForRead("test"); + fail("Should not have passed!"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("Could not find")); + } finally { + testFile.delete(); + testDir.delete(); + } + } + //@Test public void testContainerLaunchAndStop() throws IOException, InterruptedException, YarnException {