From 0bbe01f8d56191edfba3b50fb9f8859a0b3f826f Mon Sep 17 00:00:00 2001 From: Sangjin Lee Date: Thu, 12 May 2016 10:10:01 -0700 Subject: [PATCH] YARN-4577. Enable aux services to have their own custom classpath/jar file (Xuan Gong via sale) --- .../org/apache/hadoop/util/JarFinder.java | 26 ++- .../org/apache/hadoop/util/TestRunJar.java | 29 +-- .../hadoop/yarn/conf/YarnConfiguration.java | 10 +- .../containermanager/AuxServices.java | 43 ++-- ...AuxiliaryServiceWithCustomClassLoader.java | 201 ++++++++++++++++++ .../containermanager/TestAuxServices.java | 119 ++++++++++- 6 files changed, 385 insertions(+), 43 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java index 33aa02570f..478a29b331 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java @@ -14,7 +14,7 @@ package org.apache.hadoop.util; import com.google.common.base.Preconditions; - +import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; @@ -173,4 +173,28 @@ else if ("file".equals(url.getProtocol())) { } return null; } + + public static File makeClassLoaderTestJar(Class target, File rootDir, + String jarName, int buffSize, String... clsNames) throws IOException { + File jarFile = new File(rootDir, jarName); + JarOutputStream jstream = + new JarOutputStream(new FileOutputStream(jarFile)); + for (String clsName: clsNames) { + String name = clsName.replace('.', '/') + ".class"; + InputStream entryInputStream = target.getResourceAsStream( + "/" + name); + ZipEntry entry = new ZipEntry(name); + jstream.putNextEntry(entry); + BufferedInputStream bufInputStream = new BufferedInputStream( + entryInputStream, buffSize); + int count; + byte[] data = new byte[buffSize]; + while ((count = bufInputStream.read(data, 0, buffSize)) != -1) { + jstream.write(data, 0, count); + } + jstream.closeEntry(); + } + jstream.close(); + return jarFile; + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestRunJar.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestRunJar.java index 6622389554..7b61b32d53 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestRunJar.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestRunJar.java @@ -23,11 +23,9 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -import java.io.BufferedInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; import java.util.jar.JarOutputStream; import java.util.regex.Pattern; import java.util.zip.ZipEntry; @@ -156,7 +154,8 @@ public void testClientClassLoader() throws Throwable { when(runJar.getSystemClasses()).thenReturn(systemClasses); // create the test jar - File testJar = makeClassLoaderTestJar(mainCls, thirdCls); + File testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), + TEST_ROOT_DIR, TEST_JAR_2_NAME, BUFF_SIZE, mainCls, thirdCls); // form the args String[] args = new String[3]; args[0] = testJar.getAbsolutePath(); @@ -166,28 +165,4 @@ public void testClientClassLoader() throws Throwable { runJar.run(args); // it should not throw an exception } - - private File makeClassLoaderTestJar(String... clsNames) throws IOException { - File jarFile = new File(TEST_ROOT_DIR, TEST_JAR_2_NAME); - JarOutputStream jstream = - new JarOutputStream(new FileOutputStream(jarFile)); - for (String clsName: clsNames) { - String name = clsName.replace('.', '/') + ".class"; - InputStream entryInputStream = this.getClass().getResourceAsStream( - "/" + name); - ZipEntry entry = new ZipEntry(name); - jstream.putNextEntry(entry); - BufferedInputStream bufInputStream = new BufferedInputStream( - entryInputStream, BUFF_SIZE); - int count; - byte[] data = new byte[BUFF_SIZE]; - while ((count = bufInputStream.read(data, 0, BUFF_SIZE)) != -1) { - jstream.write(data, 0, count); - } - jstream.closeEntry(); - } - jstream.close(); - - return jarFile; - } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 965b6c5d83..e9c8ea9c65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1451,10 +1451,16 @@ public static boolean isAclEnabled(Configuration conf) { NM_PREFIX + "principal"; public static final String NM_AUX_SERVICES = - NM_PREFIX + "aux-services"; + NM_PREFIX + "aux-services"; public static final String NM_AUX_SERVICE_FMT = - NM_PREFIX + "aux-services.%s.class"; + NM_PREFIX + "aux-services.%s.class"; + + public static final String NM_AUX_SERVICES_CLASSPATH = + NM_AUX_SERVICES + ".%s.classpath"; + + public static final String NM_AUX_SERVICES_SYSTEM_CLASSES = + NM_AUX_SERVICES + ".%s.system-classes"; public static final String NM_USER_HOME_DIR = NM_PREFIX + "user-home-dir"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index cd5ed88738..171b20d453 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -118,21 +118,40 @@ public void serviceInit(Configuration conf) throws Exception { YarnConfiguration.NM_AUX_SERVICES +" is invalid." + "The valid service name should only contain a-zA-Z0-9_ " + "and can not start with numbers"); - Class sClass = conf.getClass( - String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, sName), null, - AuxiliaryService.class); - if (null == sClass) { - throw new RuntimeException("No class defined for " + sName); + String classKey = String.format( + YarnConfiguration.NM_AUX_SERVICE_FMT, sName); + String className = conf.get(classKey); + final String appClassPath = conf.get(String.format( + YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName)); + AuxiliaryService s = null; + boolean useCustomerClassLoader = appClassPath != null + && !appClassPath.isEmpty() && className != null + && !className.isEmpty(); + if (useCustomerClassLoader) { + s = AuxiliaryServiceWithCustomClassLoader.getInstance( + conf, className, appClassPath); + LOG.info("The aux service:" + sName + + " are using the custom classloader"); + } else { + Class sClass = conf.getClass( + classKey, null, AuxiliaryService.class); + + if (sClass == null) { + throw new RuntimeException("No class defined for " + sName); + } + s = ReflectionUtils.newInstance(sClass, conf); + } + if (s == null) { + throw new RuntimeException("No object created for " + sName); } - AuxiliaryService s = ReflectionUtils.newInstance(sClass, conf); // TODO better use s.getName()? if(!sName.equals(s.getName())) { LOG.warn("The Auxilurary Service named '"+sName+"' in the " - +"configuration is for "+sClass+" which has " - +"a name of '"+s.getName()+"'. Because these are " - +"not the same tools trying to send ServiceData and read " - +"Service Meta Data may have issues unless the refer to " - +"the name in the config."); + +"configuration is for "+s.getClass()+" which has " + +"a name of '"+s.getName()+"'. Because these are " + +"not the same tools trying to send ServiceData and read " + +"Service Meta Data may have issues unless the refer to " + +"the name in the config."); } addService(sName, s); if (recoveryEnabled) { @@ -264,4 +283,4 @@ private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service, : "The auxService name is " + service.getName()) + " and it got an error at event: " + eventType, th); } -} +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java new file mode 100644 index 0000000000..c764fbdc06 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java @@ -0,0 +1,201 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ApplicationClassLoader; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; + +final class AuxiliaryServiceWithCustomClassLoader extends AuxiliaryService { + + private final AuxiliaryService wrapped; + private final ClassLoader customClassLoader; + + private AuxiliaryServiceWithCustomClassLoader(String name, + AuxiliaryService wrapped, ClassLoader customClassLoader) { + super(name); + this.wrapped = wrapped; + this.customClassLoader = customClassLoader; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + // We pass a shared configuration as part of serviceInit call. + // To avoid the scenario that we could get a ClassNotFoundException + // when we use customClassLoader to load the class, we create a copy + // of the configuration. + Configuration config = new Configuration(conf); + // reset the service configuration + setConfig(config); + config.setClassLoader(customClassLoader); + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.init(config); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + protected void serviceStart() throws Exception { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.start(); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + protected void serviceStop() throws Exception { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.stop(); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + public void initializeApplication( + ApplicationInitializationContext initAppContext) { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.initializeApplication(initAppContext); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + public void stopApplication(ApplicationTerminationContext stopAppContext) { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.stopApplication(stopAppContext); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + public ByteBuffer getMetaData() { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + return wrapped.getMetaData(); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + public void initializeContainer(ContainerInitializationContext + initContainerContext) { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.initializeContainer(initContainerContext); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + public void stopContainer(ContainerTerminationContext stopContainerContext) { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.stopContainer(stopContainerContext); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + public void setRecoveryPath(Path recoveryPath) { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.setRecoveryPath(recoveryPath); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + public static AuxiliaryServiceWithCustomClassLoader getInstance( + Configuration conf, String className, String appClassPath) + throws IOException, ClassNotFoundException { + String[] systemClasses = conf.getTrimmedStrings(String.format( + YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES, + className)); + ClassLoader customClassLoader = createAuxServiceClassLoader( + appClassPath, systemClasses); + Class clazz = Class.forName(className, true, + customClassLoader); + Class sClass = clazz.asSubclass( + AuxiliaryService.class); + AuxiliaryService wrapped = ReflectionUtils.newInstance(sClass, conf); + return new AuxiliaryServiceWithCustomClassLoader( + className + " with custom class loader", wrapped, + customClassLoader); + } + + private static ClassLoader createAuxServiceClassLoader( + final String appClasspath, final String[] systemClasses) + throws IOException { + try { + return AccessController.doPrivileged( + new PrivilegedExceptionAction() { + @Override + public ClassLoader run() throws MalformedURLException { + return new ApplicationClassLoader(appClasspath, + AuxServices.class.getClassLoader(), + Arrays.asList(systemClasses)); + } + } + ); + } catch (PrivilegedActionException e) { + Throwable t = e.getCause(); + if (t instanceof MalformedURLException) { + throw (MalformedURLException) t; + } + throw new IOException(e); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index 9d0d0c037d..26a1003b9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -27,13 +27,21 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +import com.google.common.base.Charsets; +import com.google.common.collect.Sets; import java.io.File; import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; +import java.util.List; import java.util.Map; - +import java.util.Map.Entry; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -42,6 +50,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.service.Service; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.ApplicationClassLoader; +import org.apache.hadoop.util.JarFinder; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -150,6 +162,111 @@ public ServiceB() { } } + // Override getMetaData() method to return current + // class path. This class would be used for + // testCustomizedAuxServiceClassPath. + static class ServiceC extends LightService { + public ServiceC() { + super("C", 'C', 66, ByteBuffer.wrap("C".getBytes())); + } + + @Override + public ByteBuffer getMetaData() { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + URL[] urls = ((URLClassLoader)loader).getURLs(); + List urlString = new ArrayList(); + for (URL url : urls) { + urlString.add(url.toString()); + } + String joinedString = StringUtils.join(",", urlString); + return ByteBuffer.wrap(joinedString.getBytes()); + } + } + + // To verify whether we could load class from customized class path. + // We would use ServiceC in this test. Also create a separate jar file + // including ServiceC class, and add this jar to customized directory. + // By setting some proper configurations, we should load ServiceC class + // from customized class path. + @Test (timeout = 15000) + public void testCustomizedAuxServiceClassPath() throws Exception { + // verify that we can load AuxService Class from default Class path + Configuration conf = new YarnConfiguration(); + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[] {"ServiceC"}); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, + "ServiceC"), ServiceC.class, Service.class); + @SuppressWarnings("resource") + AuxServices aux = new AuxServices(); + aux.init(conf); + aux.start(); + Map meta = aux.getMetaData(); + String auxName = ""; + Set defaultAuxClassPath = null; + Assert.assertTrue(meta.size() == 1); + for(Entry i : meta.entrySet()) { + auxName = i.getKey(); + String auxClassPath = Charsets.UTF_8.decode(i.getValue()).toString(); + defaultAuxClassPath = new HashSet(Arrays.asList(StringUtils + .getTrimmedStrings(auxClassPath))); + } + Assert.assertTrue(auxName.equals("ServiceC")); + aux.serviceStop(); + + // create a new jar file, and configure it as customized class path + // for this AuxService, and make sure that we could load the class + // from this configured customized class path + File rootDir = GenericTestUtils.getTestDir(getClass() + .getSimpleName()); + if (!rootDir.exists()) { + rootDir.mkdirs(); + } + File testJar = null; + try { + testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir, + "test-runjar.jar", 2048, ServiceC.class.getName()); + conf = new YarnConfiguration(); + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[] {"ServiceC"}); + conf.set(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "ServiceC"), + ServiceC.class.getName()); + conf.set(String.format( + YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, "ServiceC"), + testJar.getAbsolutePath()); + // remove "-org.apache.hadoop." from system classes + String systemClasses = "-org.apache.hadoop." + "," + + ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT; + conf.set(String.format( + YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES, + "ServiceC"), systemClasses); + aux = new AuxServices(); + aux.init(conf); + aux.start(); + meta = aux.getMetaData(); + Assert.assertTrue(meta.size() == 1); + Set customizedAuxClassPath = null; + for(Entry i : meta.entrySet()) { + Assert.assertTrue(auxName.equals(i.getKey())); + String classPath = Charsets.UTF_8.decode(i.getValue()).toString(); + customizedAuxClassPath = new HashSet(Arrays.asList(StringUtils + .getTrimmedStrings(classPath))); + Assert.assertTrue(classPath.contains(testJar.getName())); + } + aux.stop(); + + // Verify that we do not have any overlap between customized class path + // and the default class path. + Set mutalClassPath = Sets.intersection(defaultAuxClassPath, + customizedAuxClassPath); + Assert.assertTrue(mutalClassPath.isEmpty()); + } finally { + if (testJar != null) { + testJar.delete(); + rootDir.delete(); + } + } + } + @Test public void testAuxEventDispatch() { Configuration conf = new Configuration();