diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index c2226855fd..e47deb2392 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -281,6 +281,7 @@ public void operationComplete(ChannelFuture future) throws Exception { } } + private final MetricsSystem ms; final ShuffleMetrics metrics; class ReduceMapFileCount implements ChannelFutureListener { @@ -397,6 +398,7 @@ public boolean getKeepAlive() { ShuffleHandler(MetricsSystem ms) { super(MAPREDUCE_SHUFFLE_SERVICEID); + this.ms = ms; metrics = ms.register(new ShuffleMetrics()); } @@ -579,6 +581,7 @@ protected void serviceStop() throws Exception { if (stateDb != null) { stateDb.close(); } + ms.unregisterSource(ShuffleMetrics.class.getSimpleName()); super.serviceStop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2f2528445d..72cc0d6451 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2222,7 +2222,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_AUX_SERVICES = NM_PREFIX + "aux-services"; - + + public static final String NM_AUX_SERVICES_MANIFEST = + NM_AUX_SERVICES + ".manifest"; + + public static final String NM_AUX_SERVICES_MANIFEST_RELOAD_MS = + NM_AUX_SERVICES + ".manifest.reload-ms"; + + public static final long DEFAULT_NM_AUX_SERVICES_MANIFEST_RELOAD_MS = 120000; + public static final String NM_AUX_SERVICE_FMT = NM_PREFIX + "aux-services.%s.class"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e7a0e1406c..ccceaab05b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1921,6 +1921,21 @@ + + A file containing auxiliary service specifications. If + manifest file is specified, yarn.nodemanager.aux-services and other + aux services configuration properties will be ignored. + yarn.nodemanager.aux-services.manifest + + + + + Length of time in ms to wait between reloading aux services + manifest. If 0 or less, manifest will not be reloaded. + yarn.nodemanager.aux-services.manifest.reload-ms + + + No. of ms to wait between sending a SIGTERM and SIGKILL to a container yarn.nodemanager.sleep-delay-before-sigkill.ms diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 84b3915a0c..113e6a7c66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -137,4 +138,8 @@ public interface Context { * @return the NM {@code DeletionService}. */ DeletionService getDeletionService(); + + void setAuxServices(AuxServices auxServices); + + AuxServices getAuxServices(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 6eda4a80b7..c6719d1b36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -684,6 +685,8 @@ public static class NMContext implements Context { private NMLogAggregationStatusTracker nmLogAggregationStatusTracker; + private AuxServices auxServices; + public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, @@ -934,6 +937,16 @@ public void setNMLogAggregationStatusTracker( public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() { return nmLogAggregationStatusTracker; } + + @Override + public void setAuxServices(AuxServices auxServices) { + this.auxServices = auxServices; + } + + @Override + public AuxServices getAuxServices() { + return this.auxServices; + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index 77c4dd9a6c..ba936d6b86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -18,15 +18,33 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.regex.Pattern; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceFile; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecord; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,54 +87,91 @@ public class AuxServices extends AbstractService public static final FsPermission NM_AUX_SERVICE_DIR_PERM = new FsPermission((short) 0700); + public static final String CLASS_NAME = "class.name"; + public static final String SYSTEM_CLASSES = "system.classes"; + static final String STATE_STORE_ROOT_NAME = "nm-aux-services"; private static final Logger LOG = LoggerFactory.getLogger(AuxServices.class); private static final String DEL_SUFFIX = "_DEL_"; - protected final Map serviceMap; - protected final Map serviceMetaData; + private final Map serviceMap; + private final Map serviceRecordMap; + private final Map serviceMetaData; private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; private final LocalDirsHandlerService dirsHandler; private final DeletionService delService; private final UserGroupInformation userUGI; + private final FsPermission storeDirPerms = new FsPermission((short)0700); + private Path stateStoreRoot = null; + private FileSystem stateStoreFs = null; + + private Path manifest; + private FileSystem manifestFS; + private Timer manifestReloadTimer; + private TimerTask manifestReloadTask; + private long manifestReloadInterval; + private long manifestModifyTS = -1; + + private final ObjectMapper mapper; + private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$"); - public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler, + AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler, Context nmContext, DeletionService deletionService) { super(AuxServices.class.getName()); serviceMap = - Collections.synchronizedMap(new HashMap()); + Collections.synchronizedMap(new HashMap()); + serviceRecordMap = + Collections.synchronizedMap(new HashMap()); serviceMetaData = Collections.synchronizedMap(new HashMap()); this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler; this.dirsHandler = nmContext.getLocalDirsHandler(); this.delService = deletionService; this.userUGI = getRemoteUgi(); + this.mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // Obtain services from configuration in init() } + /** + * Adds a service to the service map. + * + * @param name aux service name + * @param service aux service + * @param serviceRecord aux service record + */ protected final synchronized void addService(String name, - AuxiliaryService service) { - LOG.info("Adding auxiliary service " + - service.getName() + ", \"" + name + "\""); + AuxiliaryService service, AuxServiceRecord serviceRecord) { + LOG.info("Adding auxiliary service " + serviceRecord.getName() + + " version " + serviceRecord.getVersion()); serviceMap.put(name, service); + serviceRecordMap.put(name, serviceRecord); } Collection getServices() { return Collections.unmodifiableCollection(serviceMap.values()); } + /** + * Gets current aux service records. + * + * @return a collection of service records + */ + public Collection getServiceRecords() { + return Collections.unmodifiableCollection(serviceRecordMap.values()); + } + /** * @return the meta data for all registered services, that have been started. * If a service has not been started no metadata will be available. The key * is the name of the service as defined in the configuration. */ public Map getMetaData() { - Map metaClone = new HashMap( - serviceMetaData.size()); + Map metaClone = new HashMap<>(serviceMetaData.size()); synchronized (serviceMetaData) { for (Entry entry : serviceMetaData.entrySet()) { metaClone.put(entry.getKey(), entry.getValue().duplicate()); @@ -125,11 +180,470 @@ public Map getMetaData() { return metaClone; } + /** + * Creates an auxiliary service from a specification using the Configuration + * classloader. + * + * @param service aux service record + * @return auxiliary service + */ + private AuxiliaryService createAuxServiceFromConfiguration(AuxServiceRecord + service) { + Configuration c = new Configuration(false); + c.set(CLASS_NAME, getClassName(service)); + Class sClass = c.getClass(CLASS_NAME, + null, AuxiliaryService.class); + + if (sClass == null) { + throw new YarnRuntimeException("No class defined for auxiliary " + + "service" + service.getName()); + } + return ReflectionUtils.newInstance(sClass, null); + } + + /** + * Creates an auxiliary service from a specification using a custom local + * classpath. + * + * @param service aux service record + * @param appLocalClassPath local class path + * @param conf configuration + * @return auxiliary service + * @throws IOException + * @throws ClassNotFoundException + */ + private AuxiliaryService createAuxServiceFromLocalClasspath( + AuxServiceRecord service, String appLocalClassPath, Configuration conf) + throws IOException, ClassNotFoundException { + Preconditions.checkArgument(appLocalClassPath != null && + !appLocalClassPath.isEmpty(), + "local classpath was null in createAuxServiceFromLocalClasspath"); + final String sName = service.getName(); + final String className = getClassName(service); + + if (service.getConfiguration() != null && service.getConfiguration() + .getFiles().size() > 0) { + throw new YarnRuntimeException("The aux service:" + sName + + " has configured local classpath:" + appLocalClassPath + + " and config files:" + service.getConfiguration().getFiles() + + ". Only one of them should be configured."); + } + + return AuxiliaryServiceWithCustomClassLoader.getInstance(conf, className, + appLocalClassPath, getSystemClasses(service, className)); + } + + /** + * Creates an auxiliary service from a specification. + * + * @param service aux service record + * @param conf configuration + * @param fromConfiguration true if from configuration, false if from manifest + * @return auxiliary service + * @throws IOException + * @throws ClassNotFoundException + */ + private AuxiliaryService createAuxService(AuxServiceRecord service, + Configuration conf, boolean fromConfiguration) throws IOException, + ClassNotFoundException { + final String sName = service.getName(); + final String className = getClassName(service); + if (className == null || className.isEmpty()) { + throw new YarnRuntimeException("Class name not provided for auxiliary " + + "service " + sName); + } + if (fromConfiguration) { + // aux services from the configuration have an additional configuration + // option specifying a local classpath that will not be localized + final String appLocalClassPath = conf.get(String.format( + YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName)); + if (appLocalClassPath != null && !appLocalClassPath.isEmpty()) { + return createAuxServiceFromLocalClasspath(service, appLocalClassPath, + conf); + } + } + AuxServiceConfiguration serviceConf = service.getConfiguration(); + List destFiles = new ArrayList<>(); + if (serviceConf != null) { + List files = serviceConf.getFiles(); + if (files != null) { + for (AuxServiceFile file : files) { + // localize file (if needed) and add it to the list of paths that + // will become the classpath + destFiles.add(maybeDownloadJars(sName, className, file.getSrcFile(), + file.getType(), conf)); + } + } + } + + if (destFiles.size() > 0) { + // create aux service using a custom localized classpath + LOG.info("The aux service:" + sName + + " is using the custom classloader with classpath " + destFiles); + return AuxiliaryServiceWithCustomClassLoader.getInstance(conf, + className, StringUtils.join(File.pathSeparatorChar, destFiles), + getSystemClasses(service, className)); + } else { + return createAuxServiceFromConfiguration(service); + } + } + + /** + * Copies the specified remote file to local NM aux service directory. If the + * same file already exists (as determined by modification time), the file + * will not be copied again. + * + * @param sName service name + * @param className service class name + * @param remoteFile location of the file to download + * @param type type of file (STATIC for a jar or ARCHIVE for a tarball) + * @param conf configuration + * @return path of the downloaded file + * @throws IOException + */ + private Path maybeDownloadJars(String sName, String className, String + remoteFile, AuxServiceFile.TypeEnum type, Configuration conf) + throws IOException { + // load AuxiliaryService from remote classpath + FileContext localLFS = getLocalFileContext(conf); + // create NM aux-service dir in NM localdir if it does not exist. + Path nmAuxDir = dirsHandler.getLocalPathForWrite("." + + Path.SEPARATOR + NM_AUX_SERVICE_DIR); + if (!localLFS.util().exists(nmAuxDir)) { + try { + localLFS.mkdir(nmAuxDir, NM_AUX_SERVICE_DIR_PERM, true); + } catch (IOException ex) { + throw new YarnRuntimeException("Fail to create dir:" + + nmAuxDir.toString(), ex); + } + } + Path src = new Path(remoteFile); + FileContext remoteLFS = getRemoteFileContext(src.toUri(), conf); + FileStatus scFileStatus = remoteLFS.getFileStatus(src); + if (!scFileStatus.getOwner().equals( + this.userUGI.getShortUserName())) { + throw new YarnRuntimeException("The remote jarfile owner:" + + scFileStatus.getOwner() + " is not the same as the NM user:" + + this.userUGI.getShortUserName() + "."); + } + if ((scFileStatus.getPermission().toShort() & 0022) != 0) { + throw new YarnRuntimeException("The remote jarfile should not " + + "be writable by group or others. " + + "The current Permission is " + + scFileStatus.getPermission().toShort()); + } + Path downloadDest = new Path(nmAuxDir, + className + "_" + scFileStatus.getModificationTime()); + // check whether we need to re-download the jar + // from remote directory + Path targetDirPath = new Path(downloadDest, + scFileStatus.getPath().getName()); + FileStatus[] allSubDirs = localLFS.util().listStatus(nmAuxDir); + for (FileStatus sub : allSubDirs) { + if (sub.getPath().getName().equals(downloadDest.getName())) { + return new Path(targetDirPath + Path.SEPARATOR + "*"); + } else { + if (sub.getPath().getName().contains(className) && + !sub.getPath().getName().endsWith(DEL_SUFFIX)) { + Path delPath = new Path(sub.getPath().getParent(), + sub.getPath().getName() + DEL_SUFFIX); + localLFS.rename(sub.getPath(), delPath); + LOG.info("delete old aux service jar dir:" + + delPath.toString()); + FileDeletionTask deletionTask = new FileDeletionTask( + this.delService, null, delPath, null); + this.delService.delete(deletionTask); + } + } + } + LocalResourceType srcType; + if (type == AuxServiceFile.TypeEnum.STATIC) { + srcType = LocalResourceType.FILE; + } else if (type == AuxServiceFile.TypeEnum.ARCHIVE) { + srcType = LocalResourceType.ARCHIVE; + } else { + throw new YarnRuntimeException( + "Cannot unpack file of type " + type + " from remote-file-path:" + + src + "for aux-service:" + ".\n"); + } + LocalResource scRsrc = LocalResource.newInstance( + URL.fromURI(src.toUri()), + srcType, LocalResourceVisibility.PRIVATE, + scFileStatus.getLen(), scFileStatus.getModificationTime()); + FSDownload download = new FSDownload(localLFS, null, conf, + downloadDest, scRsrc, null); + try { + // don't need to convert downloaded path into a dir + // since it's already a jar path. + return download.call(); + } catch (Exception ex) { + throw new YarnRuntimeException( + "Exception happend while downloading files " + + "for aux-service:" + sName + " and remote-file-path:" + + src + ".\n" + ex.getMessage()); + } + } + + /** + * If recovery is enabled, creates a recovery directory for the named + * service and sets it on the service. + * + * @param sName name of the service + * @param s auxiliary service + * @throws IOException + */ + private void setStateStoreDir(String sName, AuxiliaryService s) throws + IOException { + if (stateStoreRoot != null) { + Path storePath = new Path(stateStoreRoot, sName); + stateStoreFs.mkdirs(storePath, storeDirPerms); + s.setRecoveryPath(storePath); + } + } + + /** + * Removes a service from the service map and stops it, if it exists. + * + * @param sName name of the service + */ + private synchronized void maybeRemoveAuxService(String sName) { + AuxiliaryService s; + s = serviceMap.remove(sName); + serviceRecordMap.remove(sName); + serviceMetaData.remove(sName); + if (s != null) { + stopAuxService(s); + } + } + + /** + * Constructs an AuxiliaryService then configures and initializes it based + * on a service specification. + * + * @param service aux service record + * @param conf configuration + * @param fromConfiguration true if from configuration, false if from manifest + * @return aux service + * @throws IOException + */ + private AuxiliaryService initAuxService(AuxServiceRecord service, + Configuration conf, boolean fromConfiguration) throws IOException { + final String sName = service.getName(); + AuxiliaryService s; + try { + Preconditions + .checkArgument( + validateAuxServiceName(sName), + "The auxiliary service name: " + sName + " is invalid. " + + "The valid service name should only contain a-zA-Z0-9_ " + + "and cannot start with numbers."); + s = createAuxService(service, conf, fromConfiguration); + if (s == null) { + throw new YarnRuntimeException("No auxiliary service class loaded for" + + " " + sName); + } + // TODO better use s.getName()? + if (!sName.equals(s.getName())) { + LOG.warn("The Auxiliary Service named '" + sName + "' in the " + + "configuration is for " + s.getClass() + " which has " + + "a name of '" + s.getName() + "'. Because these are " + + "not the same tools trying to send ServiceData and read " + + "Service Meta Data may have issues unless the refer to " + + "the name in the config."); + } + s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler); + setStateStoreDir(sName, s); + Configuration customConf = new Configuration(conf); + if (service.getConfiguration() != null) { + for (Entry entry : service.getConfiguration() + .getProperties().entrySet()) { + customConf.set(entry.getKey(), entry.getValue()); + } + } + s.init(customConf); + + LOG.info("Initialized auxiliary service " + sName); + } catch (RuntimeException e) { + LOG.error("Failed to initialize " + sName, e); + throw e; + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException(e); + } + return s; + } + + /** + * Reloads auxiliary services manifest. Must be called after service init. + * + * @throws IOException if manifest can't be loaded + */ + private void reloadManifest() throws IOException { + loadManifest(getConfig(), true); + } + + /** + * Reads the manifest file if it is configured, exists, and has not been + * modified since the last read. + * + * @return aux service records + * @throws IOException + */ + private synchronized AuxServiceRecords maybeReadManifestFile() throws + IOException { + if (manifest == null) { + return null; + } + if (!manifestFS.exists(manifest)) { + LOG.info("Manifest file " + manifest + " doesn't exist"); + return null; + } + FileStatus status; + try { + status = manifestFS.getFileStatus(manifest); + } catch (FileNotFoundException e) { + LOG.info("Manifest file " + manifest + " doesn't exist"); + return null; + } + if (status.getModificationTime() == manifestModifyTS) { + return null; + } + manifestModifyTS = status.getModificationTime(); + LOG.info("Reading auxiliary services manifest " + manifest); + try (FSDataInputStream in = manifestFS.open(manifest)) { + return mapper.readValue((InputStream) in, AuxServiceRecords.class); + } + } + + /** + * Updates current aux services based on changes found in the manifest. + * + * @param conf configuration + * @param startServices if true starts services, otherwise only inits services + * @throws IOException + */ + private synchronized void loadManifest(Configuration conf, boolean + startServices) throws IOException { + AuxServiceRecords services = maybeReadManifestFile(); + if (services == null) { + // read did not occur or no changes detected + return; + } + Set loadedAuxServices = new HashSet<>(); + boolean foundChanges = false; + if (services.getServices() != null) { + for (AuxServiceRecord service : services.getServices()) { + AuxServiceRecord existingService = serviceRecordMap.get(service + .getName()); + loadedAuxServices.add(service.getName()); + if (existingService != null && existingService.equals(service)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Auxiliary service already loaded: " + service.getName()); + } + continue; + } + foundChanges = true; + try { + // stop aux service + maybeRemoveAuxService(service.getName()); + // init aux service + AuxiliaryService s = initAuxService(service, conf, false); + if (startServices) { + // start aux service + startAuxService(service.getName(), s, service); + } + // add aux service to serviceMap + addService(service.getName(), s, service); + } catch (IOException e) { + LOG.error("Failed to load auxiliary service " + service.getName()); + } + } + } + + // remove aux services that do not appear in the manifest + List servicesToRemove = new ArrayList<>(); + for (String sName : serviceMap.keySet()) { + if (!loadedAuxServices.contains(sName)) { + foundChanges = true; + servicesToRemove.add(sName); + } + } + for (String sName : servicesToRemove) { + LOG.info("Removing aux service " + sName); + maybeRemoveAuxService(sName); + } + + if (!foundChanges) { + LOG.info("No auxiliary services changes detected in manifest"); + } + } + + private static String getClassName(AuxServiceRecord service) { + AuxServiceConfiguration serviceConf = service.getConfiguration(); + if (serviceConf == null) { + return null; + } + return serviceConf.getProperty(CLASS_NAME); + } + + private static String[] getSystemClasses(AuxServiceRecord service, String + className) { + AuxServiceConfiguration serviceConf = + service.getConfiguration(); + if (serviceConf == null) { + return new String[]{className}; + } + return StringUtils.split(serviceConf.getProperty(SYSTEM_CLASSES, + className)); + } + + /** + * Translates an aux service specified in the Configuration to an aux + * service record. + * + * @param sName aux service name + * @param conf configuration + * @return + */ + private static AuxServiceRecord createServiceRecordFromConfiguration(String + sName, Configuration conf) { + String className = conf.get(String.format( + YarnConfiguration.NM_AUX_SERVICE_FMT, sName)); + String remoteClassPath = conf.get(String.format( + YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName)); + String[] systemClasses = conf.getTrimmedStrings(String.format( + YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES, sName)); + + AuxServiceConfiguration serviceConf = new AuxServiceConfiguration(); + if (className != null) { + serviceConf.setProperty(CLASS_NAME, className); + } + if (systemClasses != null) { + serviceConf.setProperty(SYSTEM_CLASSES, StringUtils.join(",", + systemClasses)); + } + if (remoteClassPath != null) { + AuxServiceFile.TypeEnum type; + String lcClassPath = StringUtils.toLowerCase(remoteClassPath); + if (lcClassPath.endsWith(".jar")) { + type = AuxServiceFile.TypeEnum.STATIC; + } else if (lcClassPath.endsWith(".zip") || + lcClassPath.endsWith(".tar.gz") || lcClassPath.endsWith(".tgz") || + lcClassPath.endsWith(".tar")) { + type = AuxServiceFile.TypeEnum.ARCHIVE; + } else { + throw new YarnRuntimeException("Cannot unpack file from " + + "remote-file-path:" + remoteClassPath + "for aux-service:" + + sName + ".\n"); + } + AuxServiceFile file = new AuxServiceFile().srcFile(remoteClassPath) + .type(type); + serviceConf.getFiles().add(file); + } + return new AuxServiceRecord().name(sName).configuration(serviceConf); + } + @Override - public void serviceInit(Configuration conf) throws Exception { - final FsPermission storeDirPerms = new FsPermission((short)0700); - Path stateStoreRoot = null; - FileSystem stateStoreFs = null; + public synchronized void serviceInit(Configuration conf) throws Exception { boolean recoveryEnabled = conf.getBoolean( YarnConfiguration.NM_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED); @@ -138,200 +652,77 @@ public void serviceInit(Configuration conf) throws Exception { STATE_STORE_ROOT_NAME); stateStoreFs = FileSystem.getLocal(conf); } - Collection auxNames = conf.getStringCollection( - YarnConfiguration.NM_AUX_SERVICES); - for (final String sName : auxNames) { - try { - Preconditions - .checkArgument( - validateAuxServiceName(sName), - "The ServiceName: " + sName + " set in " + - YarnConfiguration.NM_AUX_SERVICES +" is invalid." + - "The valid service name should only contain a-zA-Z0-9_ " + - "and can not start with numbers"); - String classKey = String.format( - YarnConfiguration.NM_AUX_SERVICE_FMT, sName); - String className = conf.get(classKey); - final String appLocalClassPath = conf.get(String.format( - YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName)); - final String appRemoteClassPath = conf.get(String.format( - YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName)); - AuxiliaryService s = null; - boolean useCustomerClassLoader = ((appLocalClassPath != null - && !appLocalClassPath.isEmpty()) || - (appRemoteClassPath != null && !appRemoteClassPath.isEmpty())) - && className != null && !className.isEmpty(); - if (useCustomerClassLoader) { - // load AuxiliaryService from local class path - if (appRemoteClassPath == null || appRemoteClassPath.isEmpty()) { - s = AuxiliaryServiceWithCustomClassLoader.getInstance( - conf, className, appLocalClassPath); - } else { - // load AuxiliaryService from remote class path - if (appLocalClassPath != null && !appLocalClassPath.isEmpty()) { - throw new YarnRuntimeException("The aux serivce:" + sName - + " has configured local classpath:" + appLocalClassPath - + " and remote classpath:" + appRemoteClassPath - + ". Only one of them should be configured."); - } - FileContext localLFS = getLocalFileContext(conf); - // create NM aux-service dir in NM localdir if it does not exist. - Path nmAuxDir = dirsHandler.getLocalPathForWrite("." - + Path.SEPARATOR + NM_AUX_SERVICE_DIR); - if (!localLFS.util().exists(nmAuxDir)) { - try { - localLFS.mkdir(nmAuxDir, NM_AUX_SERVICE_DIR_PERM, true); - } catch (IOException ex) { - throw new YarnRuntimeException("Fail to create dir:" - + nmAuxDir.toString(), ex); - } - } - Path src = new Path(appRemoteClassPath); - FileContext remoteLFS = getRemoteFileContext(src.toUri(), conf); - FileStatus scFileStatus = remoteLFS.getFileStatus(src); - if (!scFileStatus.getOwner().equals( - this.userUGI.getShortUserName())) { - throw new YarnRuntimeException("The remote jarfile owner:" - + scFileStatus.getOwner() + " is not the same as the NM user:" - + this.userUGI.getShortUserName() + "."); - } - if ((scFileStatus.getPermission().toShort() & 0022) != 0) { - throw new YarnRuntimeException("The remote jarfile should not " - + "be writable by group or others. " - + "The current Permission is " - + scFileStatus.getPermission().toShort()); - } - Path dest = null; - Path downloadDest = new Path(nmAuxDir, - className + "_" + scFileStatus.getModificationTime()); - // check whether we need to re-download the jar - // from remote directory - Path targetDirPath = new Path(downloadDest, - scFileStatus.getPath().getName()); - FileStatus[] allSubDirs = localLFS.util().listStatus(nmAuxDir); - boolean reDownload = true; - for (FileStatus sub : allSubDirs) { - if (sub.getPath().getName().equals(downloadDest.getName())) { - reDownload = false; - dest = new Path(targetDirPath + Path.SEPARATOR + "*"); - break; - } else { - if (sub.getPath().getName().contains(className) && - !sub.getPath().getName().endsWith(DEL_SUFFIX)) { - Path delPath = new Path(sub.getPath().getParent(), - sub.getPath().getName() + DEL_SUFFIX); - localLFS.rename(sub.getPath(), delPath); - LOG.info("delete old aux service jar dir:" - + delPath.toString()); - FileDeletionTask deletionTask = new FileDeletionTask( - this.delService, null, delPath, null); - this.delService.delete(deletionTask); - } - } - } - if (reDownload) { - LocalResourceType srcType = null; - String lowerDst = StringUtils.toLowerCase(src.toString()); - if (lowerDst.endsWith(".jar")) { - srcType = LocalResourceType.FILE; - } else if (lowerDst.endsWith(".zip") || - lowerDst.endsWith(".tar.gz") || lowerDst.endsWith(".tgz") - || lowerDst.endsWith(".tar")) { - srcType = LocalResourceType.ARCHIVE; - } else { - throw new YarnRuntimeException( - "Can not unpack file from remote-file-path:" + src - + "for aux-service:" + ".\n"); - } - LocalResource scRsrc = LocalResource.newInstance( - URL.fromURI(src.toUri()), - srcType, LocalResourceVisibility.PRIVATE, - scFileStatus.getLen(), scFileStatus.getModificationTime()); - FSDownload download = new FSDownload(localLFS, null, conf, - downloadDest, scRsrc, null); - try { - Path downloaded = download.call(); - // don't need to convert downloaded path into a dir - // since its already a jar path. - dest = downloaded; - } catch (Exception ex) { - throw new YarnRuntimeException( - "Exception happend while downloading files " - + "for aux-service:" + sName + " and remote-file-path:" - + src + ".\n" + ex.getMessage()); - } - } - s = AuxiliaryServiceWithCustomClassLoader.getInstance( - new Configuration(conf), className, dest.toString()); - } - LOG.info("The aux service:" + sName - + " are using the custom classloader"); - } else { - Class sClass = conf.getClass( - classKey, null, AuxiliaryService.class); - - if (sClass == null) { - throw new RuntimeException("No class defined for " + sName); - } - s = ReflectionUtils.newInstance(sClass, new Configuration(conf)); - } - if (s == null) { - throw new RuntimeException("No object created for " + sName); - } - // TODO better use s.getName()? - if(!sName.equals(s.getName())) { - LOG.warn("The Auxiliary Service named '"+sName+"' in the " - +"configuration is for "+s.getClass()+" which has " - +"a name of '"+s.getName()+"'. Because these are " - +"not the same tools trying to send ServiceData and read " - +"Service Meta Data may have issues unless the refer to " - +"the name in the config."); - } - s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler); - addService(sName, s); - if (recoveryEnabled) { - Path storePath = new Path(stateStoreRoot, sName); - stateStoreFs.mkdirs(storePath, storeDirPerms); - s.setRecoveryPath(storePath); - } - s.init(new Configuration(conf)); - } catch (RuntimeException e) { - LOG.error("Failed to initialize " + sName, e); - throw e; + String manifestStr = conf.get(YarnConfiguration.NM_AUX_SERVICES_MANIFEST); + if (manifestStr == null) { + Collection auxNames = conf.getStringCollection( + YarnConfiguration.NM_AUX_SERVICES); + for (final String sName : auxNames) { + AuxServiceRecord service = createServiceRecordFromConfiguration(sName, + conf); + maybeRemoveAuxService(sName); + AuxiliaryService s = initAuxService(service, conf, true); + addService(sName, s, service); } + } else { + manifest = new Path(manifestStr); + manifestFS = FileSystem.get(new URI(manifestStr), conf); + loadManifest(conf, false); } + manifestReloadInterval = conf.getLong( + YarnConfiguration.NM_AUX_SERVICES_MANIFEST_RELOAD_MS, + YarnConfiguration.DEFAULT_NM_AUX_SERVICES_MANIFEST_RELOAD_MS); + manifestReloadTask = new ManifestReloadTask(); + super.serviceInit(conf); } + private void startAuxService(String name, AuxiliaryService service, + AuxServiceRecord serviceRecord) { + service.start(); + service.registerServiceListener(this); + ByteBuffer meta = service.getMetaData(); + if (meta != null) { + serviceMetaData.put(name, meta); + } + serviceRecord.setLaunchTime(new Date()); + } + + private void stopAuxService(Service service) { + if (service.getServiceState() == Service.STATE.STARTED) { + service.unregisterServiceListener(this); + service.stop(); + } + } + @Override - public void serviceStart() throws Exception { + public synchronized void serviceStart() throws Exception { // TODO fork(?) services running as configured user // monitor for health, shutdown/restart(?) if any should die for (Map.Entry entry : serviceMap.entrySet()) { AuxiliaryService service = entry.getValue(); String name = entry.getKey(); - service.start(); - service.registerServiceListener(this); - ByteBuffer meta = service.getMetaData(); - if(meta != null) { - serviceMetaData.put(name, meta); - } + startAuxService(name, service, serviceRecordMap.get(name)); + } + if (manifest != null && manifestReloadInterval > 0) { + manifestReloadTimer = new Timer("AuxServicesManifestRelaod-Timer", + true); + manifestReloadTimer.schedule(manifestReloadTask, + manifestReloadInterval, manifestReloadInterval); } super.serviceStart(); } @Override - public void serviceStop() throws Exception { + public synchronized void serviceStop() throws Exception { try { - synchronized (serviceMap) { - for (Service service : serviceMap.values()) { - if (service.getServiceState() == Service.STATE.STARTED) { - service.unregisterServiceListener(this); - service.stop(); - } - } - serviceMap.clear(); - serviceMetaData.clear(); + for (Service service : serviceMap.values()) { + stopAuxService(service); + } + serviceMap.clear(); + serviceRecordMap.clear(); + serviceMetaData.clear(); + if (manifestReloadTimer != null) { + manifestReloadTimer.cancel(); } } finally { super.serviceStop(); @@ -340,9 +731,9 @@ public void serviceStop() throws Exception { @Override public void stateChanged(Service service) { - LOG.error("Service " + service.getName() + " changed state: " + + // services changing state is expected on reload + LOG.info("Service " + service.getName() + " changed state: " + service.getServiceState()); - stop(); } @Override @@ -448,4 +839,38 @@ private UserGroupInformation getRemoteUgi() { } return remoteUgi; } + + protected static AuxServiceRecord newAuxService(String name, String + className) { + AuxServiceConfiguration serviceConf = new AuxServiceConfiguration(); + serviceConf.setProperty(CLASS_NAME, className); + return new AuxServiceRecord().name(name).configuration(serviceConf); + } + + protected static void setClasspath(AuxServiceRecord service, String + classpath) { + service.getConfiguration().getFiles().add(new AuxServiceFile() + .srcFile(classpath).type(AuxServiceFile.TypeEnum.STATIC)); + } + + protected static void setSystemClasses(AuxServiceRecord service, String + systemClasses) { + service.getConfiguration().setProperty(SYSTEM_CLASSES, systemClasses); + } + + /** + * Class which is used by the {@link Timer} class to periodically execute the + * manifest reload. + */ + private final class ManifestReloadTask extends TimerTask { + @Override + public void run() { + try { + reloadManifest(); + } catch (Throwable t) { + // Prevent uncaught exceptions from killing this thread + LOG.warn("Error while reloading manifest: ", t); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java index c764fbdc06..d0b039e35f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.ApplicationClassLoader; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; @@ -159,11 +158,8 @@ public void setRecoveryPath(Path recoveryPath) { } public static AuxiliaryServiceWithCustomClassLoader getInstance( - Configuration conf, String className, String appClassPath) - throws IOException, ClassNotFoundException { - String[] systemClasses = conf.getTrimmedStrings(String.format( - YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES, - className)); + Configuration conf, String className, String appClassPath, String[] + systemClasses) throws IOException, ClassNotFoundException { ClassLoader customClassLoader = createAuxServiceClassLoader( appClassPath, systemClasses); Class clazz = Class.forName(className, true, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 8a12c3c510..42ea59c831 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -258,6 +258,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler, this.context, this.deletionService); auxiliaryServices.registerServiceListener(this); + context.setAuxServices(auxiliaryServices); addService(auxiliaryServices); // initialize the metrics publisher if the timeline service v.2 is enabled diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/AuxServiceConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/AuxServiceConfiguration.java new file mode 100644 index 0000000000..24383ab003 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/AuxServiceConfiguration.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.records; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Set of configuration properties that can be injected into the service + * components via envs, files and custom pluggable helper docker containers. + * Files of several standard formats like xml, properties, json, yaml and + * templates will be supported. + **/ +@InterfaceAudience.Public +@InterfaceStability.Unstable +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AuxServiceConfiguration { + + private Map properties = new HashMap<>(); + private List files = new ArrayList<>(); + + /** + * A blob of key-value pairs of common service properties. + **/ + public AuxServiceConfiguration properties(Map props) { + this.properties = props; + return this; + } + + @JsonProperty("properties") + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + /** + * Array of list of files that needs to be created and made available as + * volumes in the service component containers. + **/ + public AuxServiceConfiguration files(List fileList) { + this.files = fileList; + return this; + } + + @JsonProperty("files") + public List getFiles() { + return files; + } + + public void setFiles(List files) { + this.files = files; + } + + public String getProperty(String name, String defaultValue) { + String value = getProperty(name); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + return value; + } + + public void setProperty(String name, String value) { + properties.put(name, value); + } + + public String getProperty(String name) { + return properties.get(name.trim()); + } + + @Override + public boolean equals(java.lang.Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AuxServiceConfiguration configuration = (AuxServiceConfiguration) o; + return Objects.equals(this.properties, configuration.properties) + && Objects.equals(this.files, configuration.files); + } + + @Override + public int hashCode() { + return Objects.hash(properties, files); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class Configuration {\n"); + + sb.append(" properties: ").append(toIndentedString(properties)) + .append("\n"); + sb.append(" files: ").append(toIndentedString(files)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(java.lang.Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/AuxServiceFile.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/AuxServiceFile.java new file mode 100644 index 0000000000..6b79b9c698 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/AuxServiceFile.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.records; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.Objects; + +/** + * A config file that needs to be created and made available as a volume in an + * service component container. + **/ +@InterfaceAudience.Public +@InterfaceStability.Unstable +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AuxServiceFile { + + /** + * Config Type. + **/ + public enum TypeEnum { + STATIC("STATIC"), ARCHIVE("ARCHIVE"); + + private String value; + + TypeEnum(String type) { + this.value = type; + } + + @Override + @JsonValue + public String toString() { + return value; + } + } + + private TypeEnum type = null; + private String srcFile = null; + + /** + * Config file in the standard format like xml, properties, json, yaml, + * template. + **/ + public AuxServiceFile type(TypeEnum t) { + this.type = t; + return this; + } + + @JsonProperty("type") + public TypeEnum getType() { + return type; + } + + public void setType(TypeEnum type) { + this.type = type; + } + + /** + * This provides the source location of the configuration file, the content + * of which is dumped to dest_file post property substitutions, in the format + * as specified in type. Typically the src_file would point to a source + * controlled network accessible file maintained by tools like puppet, chef, + * or hdfs etc. Currently, only hdfs is supported. + **/ + public AuxServiceFile srcFile(String file) { + this.srcFile = file; + return this; + } + + @JsonProperty("src_file") + public String getSrcFile() { + return srcFile; + } + + public void setSrcFile(String srcFile) { + this.srcFile = srcFile; + } + + @Override + public boolean equals(java.lang.Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AuxServiceFile auxServiceFile = (AuxServiceFile) o; + return Objects.equals(this.type, auxServiceFile.type) + && Objects.equals(this.srcFile, auxServiceFile.srcFile); + } + + @Override + public int hashCode() { + return Objects.hash(type, srcFile); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class AuxServiceFile {\n"); + + sb.append(" type: ").append(toIndentedString(type)).append("\n"); + sb.append(" srcFile: ").append(toIndentedString(srcFile)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(java.lang.Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/AuxServiceRecord.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/AuxServiceRecord.java new file mode 100644 index 0000000000..40882beba7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/AuxServiceRecord.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.records; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.Date; +import java.util.Objects; + +/** + * An Service resource has the following attributes. + **/ +@InterfaceAudience.Public +@InterfaceStability.Unstable +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonPropertyOrder({ "name", "version", "description", "launch_time", + "configuration" }) +public class AuxServiceRecord { + + private String name = null; + private String version = null; + private String description = null; + private Date launchTime = null; + private AuxServiceConfiguration configuration = new AuxServiceConfiguration(); + + /** + * A unique service name. + **/ + public AuxServiceRecord name(String n) { + this.name = n; + return this; + } + + @JsonProperty("name") + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @JsonProperty("version") + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + /** + * Version of the service. + */ + public AuxServiceRecord version(String v) { + this.version = v; + return this; + } + + @JsonProperty("description") + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + /** + * Description of the service. + */ + public AuxServiceRecord description(String d) { + this.description = d; + return this; + } + + /** + * The time when the service was created, e.g. 2016-03-16T01:01:49.000Z. + **/ + public AuxServiceRecord launchTime(Date time) { + this.launchTime = time == null ? null : (Date) time.clone(); + return this; + } + + @JsonProperty("launch_time") + public Date getLaunchTime() { + return launchTime == null ? null : (Date) launchTime.clone(); + } + + public void setLaunchTime(Date time) { + this.launchTime = time == null ? null : (Date) time.clone(); + } + /** + * Config properties of an service. Configurations provided at the + * service/global level are available to all the components. Specific + * properties can be overridden at the component level. + **/ + public AuxServiceRecord configuration(AuxServiceConfiguration conf) { + this.configuration = conf; + return this; + } + + @JsonProperty("configuration") + public AuxServiceConfiguration getConfiguration() { + return configuration; + } + + public void setConfiguration(AuxServiceConfiguration conf) { + this.configuration = conf; + } + + @Override + public boolean equals(java.lang.Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AuxServiceRecord service = (AuxServiceRecord) o; + return Objects.equals(this.name, service.name) && Objects.equals(this + .version, service.version); + } + + @Override + public int hashCode() { + return Objects.hash(name, version); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class Service {\n"); + + sb.append(" name: ").append(toIndentedString(name)).append("\n"); + sb.append(" version: ").append(toIndentedString(version)).append("\n"); + sb.append(" description: ").append(toIndentedString(description)) + .append("\n"); + sb.append(" configuration: ").append(toIndentedString(configuration)) + .append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(java.lang.Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/AuxServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/AuxServiceRecords.java new file mode 100644 index 0000000000..c2454e71a1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/AuxServiceRecords.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.records; + +import com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.ArrayList; +import java.util.List; + +/** + * A list of Services. + **/ +@InterfaceAudience.Public +@InterfaceStability.Unstable +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AuxServiceRecords { + private List services = new ArrayList<>(); + + public AuxServiceRecords serviceList(AuxServiceRecord... serviceList) { + for (AuxServiceRecord service : serviceList) { + this.services.add(service); + } + return this; + } + + public List getServices() { + return services; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/package-info.java new file mode 100644 index 0000000000..bfdc33f166 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/records/package-info.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package used for auxiliary services manifest records. To the extent + * possible, the format matches the YARN service framework service + * specification. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.nodemanager.containermanager.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java index ca08897eee..4b82f37a42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java @@ -24,13 +24,16 @@ import java.nio.charset.Charset; import java.security.Principal; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map.Entry; import java.util.Set; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecord; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AuxiliaryServicesInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMResourceInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -556,6 +559,25 @@ public Object getNMResourceInfo( return new NMResourceInfo(); } + @GET + @Path("/auxiliaryservices") + @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, + MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) + public AuxiliaryServicesInfo getAuxiliaryServices(@javax.ws.rs.core.Context + HttpServletRequest hsr) { + init(); + AuxiliaryServicesInfo auxiliaryServices = new AuxiliaryServicesInfo(); + if (!hasAdminAccess(hsr)) { + return auxiliaryServices; + } + Collection loadedServices = nmContext.getAuxServices() + .getServiceRecords(); + if (loadedServices != null) { + auxiliaryServices.addAll(loadedServices); + } + return auxiliaryServices; + } + @PUT @Path("/yarn/sysfs/{user}/{appId}") @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, @@ -625,6 +647,21 @@ protected Boolean hasAccess(String user, ApplicationId appId, return true; } + protected Boolean hasAdminAccess(HttpServletRequest hsr) { + // Check for the authorization. + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); + + if (callerUGI == null) { + return false; + } + + if (!this.nmContext.getApplicationACLsManager().isAdmin(callerUGI)) { + return false; + } + + return true; + } + private UserGroupInformation getCallerUserGroupInformation( HttpServletRequest hsr, boolean usePrincipal) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/AuxiliaryServiceInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/AuxiliaryServiceInfo.java new file mode 100644 index 0000000000..873314d28e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/AuxiliaryServiceInfo.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.webapp.dao; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Information about a loaded auxiliary service. + */ +@XmlRootElement(name = "service") +@XmlAccessorType(XmlAccessType.FIELD) +public class AuxiliaryServiceInfo { + private String name; + private String version; + private String startTime; + + public AuxiliaryServiceInfo() { + // JAXB needs this + } + + public AuxiliaryServiceInfo(String name, String version, Date startTime) { + DateFormat dateFormat = + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + this.name = name; + this.version = version; + this.startTime = dateFormat.format(startTime); + } + + public String getName() { + return name; + } + + public String getVersion() { + return version; + } + + public String getStartTime() { + return startTime; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/AuxiliaryServicesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/AuxiliaryServicesInfo.java new file mode 100644 index 0000000000..e11b9c296f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/AuxiliaryServicesInfo.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.webapp.dao; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecord; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.Collection; + +/** + * A list of loaded auxiliary services. + */ +@XmlRootElement(name = "services") +@XmlAccessorType(XmlAccessType.FIELD) +public class AuxiliaryServicesInfo { + private ArrayList services = new + ArrayList<>(); + + public AuxiliaryServicesInfo() { + // JAXB needs this + } + + public void add(AuxServiceRecord s) { + services.add(new AuxiliaryServiceInfo(s.getName(), s.getVersion(), s + .getLaunchTime())); + } + + public void addAll(Collection serviceList) { + for (AuxServiceRecord service : serviceList) { + add(service); + } + } + + public ArrayList getServices() { + return services; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 2794857c6e..2ecbf316eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -827,5 +828,14 @@ public DeletionService getDeletionService() { public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() { return null; } + + @Override + public void setAuxServices(AuxServices auxServices) { + } + + @Override + public AuxServices getAuxServices() { + return null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index ca0b32a752..ece8d9bfb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -31,6 +31,15 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecord; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecords; +import org.junit.After; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +96,11 @@ import org.junit.Assert; import org.junit.Test; +/** + * Test for auxiliary services. Parameter 0 tests the Configuration-based aux + * services and parameter 1 tests manifest-based aux services. + */ +@RunWith(value = Parameterized.class) public class TestAuxServices { private static final Logger LOG = LoggerFactory.getLogger(TestAuxServices.class); @@ -99,6 +113,36 @@ public class TestAuxServices { private final static Context MOCK_CONTEXT = mock(Context.class); private final static DeletionService MOCK_DEL_SERVICE = mock( DeletionService.class); + private final Boolean useManifest; + private File rootDir = GenericTestUtils.getTestDir(getClass() + .getSimpleName()); + private File manifest = new File(rootDir, "manifest.txt"); + private ObjectMapper mapper = new ObjectMapper(); + + @Parameterized.Parameters + public static Collection getParams() { + return Arrays.asList(false, true); + } + + @Before + public void setup() { + if (!rootDir.exists()) { + rootDir.mkdirs(); + } + } + + @After + public void cleanup() { + if (useManifest) { + manifest.delete(); + } + rootDir.delete(); + } + + public TestAuxServices(Boolean useManifest) { + this.useManifest = useManifest; + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + } static class LightService extends AuxiliaryService implements Service { @@ -204,15 +248,27 @@ public ByteBuffer getMetaData() { } } + private void writeManifestFile(AuxServiceRecords services, Configuration + conf) throws IOException { + conf.set(YarnConfiguration.NM_AUX_SERVICES_MANIFEST, manifest + .getAbsolutePath()); + mapper.writeValue(manifest, services); + } + @SuppressWarnings("resource") @Test public void testRemoteAuxServiceClassPath() throws Exception { Configuration conf = new YarnConfiguration(); FileSystem fs = FileSystem.get(conf); - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, - new String[] {"ServiceC"}); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, - "ServiceC"), ServiceC.class, Service.class); + AuxServiceRecord serviceC = + AuxServices.newAuxService("ServiceC", ServiceC.class.getName()); + AuxServiceRecords services = new AuxServiceRecords().serviceList(serviceC); + if (!useManifest) { + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[]{"ServiceC"}); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, + "ServiceC"), ServiceC.class, Service.class); + } Context mockContext2 = mock(Context.class); LocalDirsHandlerService mockDirsHandler = mock( @@ -223,11 +279,8 @@ public void testRemoteAuxServiceClassPath() throws Exception { rootAuxServiceDirPath); when(mockContext2.getLocalDirsHandler()).thenReturn(mockDirsHandler); - File rootDir = GenericTestUtils.getTestDir(getClass() - .getSimpleName()); - if (!rootDir.exists()) { - rootDir.mkdirs(); - } + DeletionService mockDelService2 = mock(DeletionService.class); + AuxServices aux = null; File testJar = null; try { @@ -243,11 +296,16 @@ public void testRemoteAuxServiceClassPath() throws Exception { perms.add(PosixFilePermission.GROUP_WRITE); Files.setPosixFilePermissions(Paths.get(testJar.getAbsolutePath()), perms); - conf.set(String.format( - YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"), - testJar.getAbsolutePath()); + if (useManifest) { + AuxServices.setClasspath(serviceC, testJar.getAbsolutePath()); + writeManifestFile(services, conf); + } else { + conf.set(String.format( + YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"), + testJar.getAbsolutePath()); + } aux = new AuxServices(MOCK_AUX_PATH_HANDLER, - mockContext2, MOCK_DEL_SERVICE); + mockContext2, mockDelService2); aux.init(conf); Assert.fail("The permission of the jar is wrong." + "Should throw out exception."); @@ -260,11 +318,16 @@ public void testRemoteAuxServiceClassPath() throws Exception { testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir, "test-runjar.jar", 2048, ServiceC.class.getName()); - conf.set(String.format( - YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"), - testJar.getAbsolutePath()); + if (useManifest) { + AuxServices.setClasspath(serviceC, testJar.getAbsolutePath()); + writeManifestFile(services, conf); + } else { + conf.set(String.format( + YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"), + testJar.getAbsolutePath()); + } aux = new AuxServices(MOCK_AUX_PATH_HANDLER, - mockContext2, MOCK_DEL_SERVICE); + mockContext2, mockDelService2); aux.init(conf); aux.start(); Map meta = aux.getMetaData(); @@ -281,7 +344,7 @@ public void testRemoteAuxServiceClassPath() throws Exception { // initialize the same auxservice again, and make sure that we did not // re-download the jar from remote directory. aux = new AuxServices(MOCK_AUX_PATH_HANDLER, - mockContext2, MOCK_DEL_SERVICE); + mockContext2, mockDelService2); aux.init(conf); aux.start(); meta = aux.getMetaData(); @@ -290,7 +353,7 @@ public void testRemoteAuxServiceClassPath() throws Exception { auxName = i.getKey(); } Assert.assertEquals("ServiceC", auxName); - verify(MOCK_DEL_SERVICE, times(0)).delete(any(FileDeletionTask.class)); + verify(mockDelService2, times(0)).delete(any(FileDeletionTask.class)); status = fs.listStatus(rootAuxServiceDirPath); Assert.assertTrue(status.length == 1); aux.serviceStop(); @@ -301,22 +364,17 @@ public void testRemoteAuxServiceClassPath() throws Exception { FileTime fileTime = FileTime.fromMillis(time); Files.setLastModifiedTime(Paths.get(testJar.getAbsolutePath()), fileTime); - conf.set( - String.format(YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, - "ServiceC"), - testJar.getAbsolutePath()); aux = new AuxServices(MOCK_AUX_PATH_HANDLER, - mockContext2, MOCK_DEL_SERVICE); + mockContext2, mockDelService2); aux.init(conf); aux.start(); - verify(MOCK_DEL_SERVICE, times(1)).delete(any(FileDeletionTask.class)); + verify(mockDelService2, times(1)).delete(any(FileDeletionTask.class)); status = fs.listStatus(rootAuxServiceDirPath); Assert.assertTrue(status.length == 2); aux.serviceStop(); } finally { if (testJar != null) { testJar.delete(); - rootDir.delete(); } if (fs.exists(new Path(root))) { fs.delete(new Path(root), true); @@ -333,10 +391,17 @@ public void testRemoteAuxServiceClassPath() throws Exception { public void testCustomizedAuxServiceClassPath() throws Exception { // verify that we can load AuxService Class from default Class path Configuration conf = new YarnConfiguration(); - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, - new String[] {"ServiceC"}); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, - "ServiceC"), ServiceC.class, Service.class); + AuxServiceRecord serviceC = + AuxServices.newAuxService("ServiceC", ServiceC.class.getName()); + AuxServiceRecords services = new AuxServiceRecords().serviceList(serviceC); + if (useManifest) { + writeManifestFile(services, conf); + } else { + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[]{"ServiceC"}); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, + "ServiceC"), ServiceC.class, Service.class); + } @SuppressWarnings("resource") AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT, MOCK_DEL_SERVICE); @@ -358,31 +423,41 @@ public void testCustomizedAuxServiceClassPath() throws Exception { // create a new jar file, and configure it as customized class path // for this AuxService, and make sure that we could load the class // from this configured customized class path - File rootDir = GenericTestUtils.getTestDir(getClass() - .getSimpleName()); - if (!rootDir.exists()) { - rootDir.mkdirs(); - } File testJar = null; try { testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir, - "test-runjar.jar", 2048, ServiceC.class.getName()); + "test-runjar.jar", 2048, ServiceC.class.getName(), LightService + .class.getName()); conf = new YarnConfiguration(); - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, - new String[] {"ServiceC"}); - conf.set(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "ServiceC"), - ServiceC.class.getName()); - conf.set(String.format( - YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, "ServiceC"), - testJar.getAbsolutePath()); // remove "-org.apache.hadoop." from system classes String systemClasses = "-org.apache.hadoop." + "," + - ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT; - conf.set(String.format( - YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES, - "ServiceC"), systemClasses); + ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT; + if (useManifest) { + AuxServices.setClasspath(serviceC, testJar.getAbsolutePath()); + AuxServices.setSystemClasses(serviceC, systemClasses); + writeManifestFile(services, conf); + } else { + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[]{"ServiceC"}); + conf.set(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, + "ServiceC"), ServiceC.class.getName()); + conf.set(String.format( + YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, "ServiceC"), + testJar.getAbsolutePath()); + conf.set(String.format( + YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES, + "ServiceC"), systemClasses); + } + Context mockContext2 = mock(Context.class); + LocalDirsHandlerService mockDirsHandler = mock( + LocalDirsHandlerService.class); + String root = "target/LocalDir"; + Path rootAuxServiceDirPath = new Path(root, "nmAuxService"); + when(mockDirsHandler.getLocalPathForWrite(anyString())).thenReturn( + rootAuxServiceDirPath); + when(mockContext2.getLocalDirsHandler()).thenReturn(mockDirsHandler); aux = new AuxServices(MOCK_AUX_PATH_HANDLER, - MOCK_CONTEXT, MOCK_DEL_SERVICE); + mockContext2, MOCK_DEL_SERVICE); aux.init(conf); aux.start(); meta = aux.getMetaData(); @@ -405,21 +480,32 @@ public void testCustomizedAuxServiceClassPath() throws Exception { } finally { if (testJar != null) { testJar.delete(); - rootDir.delete(); } } } @Test - public void testAuxEventDispatch() { + public void testAuxEventDispatch() throws IOException { Configuration conf = new Configuration(); - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] { "Asrv", "Bsrv" }); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"), - ServiceA.class, Service.class); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), - ServiceB.class, Service.class); - conf.setInt("A.expected.init", 1); - conf.setInt("B.expected.stop", 1); + if (useManifest) { + AuxServiceRecord serviceA = + AuxServices.newAuxService("Asrv", ServiceA.class.getName()); + serviceA.getConfiguration().setProperty("A.expected.init", "1"); + AuxServiceRecord serviceB = + AuxServices.newAuxService("Bsrv", ServiceB.class.getName()); + serviceB.getConfiguration().setProperty("B.expected.stop", "1"); + writeManifestFile(new AuxServiceRecords().serviceList(serviceA, + serviceB), conf); + } else { + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[]{"Asrv", + "Bsrv"}); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"), + ServiceA.class, Service.class); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), + ServiceB.class, Service.class); + conf.setInt("A.expected.init", 1); + conf.setInt("B.expected.stop", 1); + } final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT, MOCK_DEL_SERVICE); aux.init(conf); @@ -477,14 +563,36 @@ public void testAuxEventDispatch() { } } - @Test - public void testAuxServices() { + private Configuration getABConf() throws + IOException { + return getABConf("Asrv", "Bsrv", ServiceA.class, ServiceB.class); + } + + private Configuration getABConf(String aName, String bName, + Class aClass, Class bClass) throws + IOException { Configuration conf = new Configuration(); - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] { "Asrv", "Bsrv" }); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"), - ServiceA.class, Service.class); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), - ServiceB.class, Service.class); + if (useManifest) { + AuxServiceRecord serviceA = + AuxServices.newAuxService(aName, aClass.getName()); + AuxServiceRecord serviceB = + AuxServices.newAuxService(bName, bClass.getName()); + writeManifestFile(new AuxServiceRecords().serviceList(serviceA, serviceB), + conf); + } else { + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[]{aName, + bName}); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, aName), + aClass, Service.class); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, bName), + bClass, Service.class); + } + return conf; + } + + @Test + public void testAuxServices() throws IOException { + Configuration conf = getABConf(); final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT, MOCK_DEL_SERVICE); aux.init(conf); @@ -510,15 +618,9 @@ public void testAuxServices() { } } - @Test - public void testAuxServicesMeta() { - Configuration conf = new Configuration(); - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] { "Asrv", "Bsrv" }); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"), - ServiceA.class, Service.class); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), - ServiceB.class, Service.class); + public void testAuxServicesMeta() throws IOException { + Configuration conf = getABConf(); final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT, MOCK_DEL_SERVICE); aux.init(conf); @@ -547,16 +649,10 @@ public void testAuxServicesMeta() { } } - - @Test - public void testAuxUnexpectedStop() { - Configuration conf = new Configuration(); - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] { "Asrv", "Bsrv" }); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"), - ServiceA.class, Service.class); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), - ServiceB.class, Service.class); + public void testAuxUnexpectedStop() throws IOException { + // AuxServices no longer expected to stop when services stop + Configuration conf = getABConf(); final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT, MOCK_DEL_SERVICE); aux.init(conf); @@ -564,21 +660,17 @@ public void testAuxUnexpectedStop() { Service s = aux.getServices().iterator().next(); s.stop(); - assertEquals("Auxiliary service stopped, but AuxService unaffected.", - STOPPED, aux.getServiceState()); - assertTrue(aux.getServices().isEmpty()); + assertEquals("Auxiliary service stop caused AuxServices stop", + STARTED, aux.getServiceState()); + assertEquals(2, aux.getServices().size()); } @Test - public void testValidAuxServiceName() { + public void testValidAuxServiceName() throws IOException { + Configuration conf = getABConf("Asrv1", "Bsrv_2", ServiceA.class, + ServiceB.class); final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT, MOCK_DEL_SERVICE); - Configuration conf = new Configuration(); - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"Asrv1", "Bsrv_2"}); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv1"), - ServiceA.class, Service.class); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv_2"), - ServiceB.class, Service.class); try { aux.init(conf); } catch (Exception ex) { @@ -588,30 +680,33 @@ public void testValidAuxServiceName() { //Test bad auxService Name final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT, MOCK_DEL_SERVICE); - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"1Asrv1"}); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "1Asrv1"), - ServiceA.class, Service.class); + if (useManifest) { + AuxServiceRecord serviceA = + AuxServices.newAuxService("1Asrv1", ServiceA.class.getName()); + writeManifestFile(new AuxServiceRecords().serviceList(serviceA), conf); + } else { + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] + {"1Asrv1"}); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, + "1Asrv1"), ServiceA.class, Service.class); + } try { aux1.init(conf); Assert.fail("Should receive the exception."); } catch (Exception ex) { - assertTrue(ex.getMessage().contains("The ServiceName: 1Asrv1 set in " + - "yarn.nodemanager.aux-services is invalid.The valid service name " + - "should only contain a-zA-Z0-9_ and can not start with numbers")); + assertTrue("Wrong message: " + ex.getMessage(), + ex.getMessage().contains("The auxiliary service name: 1Asrv1 is " + + "invalid. The valid service name should only contain a-zA-Z0-9_" + + " and cannot start with numbers.")); } } @Test public void testAuxServiceRecoverySetup() throws IOException { - Configuration conf = new YarnConfiguration(); + Configuration conf = getABConf("Asrv", "Bsrv", RecoverableServiceA.class, + RecoverableServiceB.class); conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); conf.set(YarnConfiguration.NM_RECOVERY_DIR, TEST_DIR.toString()); - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, - new String[] { "Asrv", "Bsrv" }); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"), - RecoverableServiceA.class, Service.class); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), - RecoverableServiceB.class, Service.class); try { final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT, MOCK_DEL_SERVICE); @@ -708,20 +803,33 @@ public ByteBuffer getMetaData() { } @Test - public void testAuxServicesConfChange() { + public void testAuxServicesConfChange() throws IOException { Configuration conf = new Configuration(); - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, - new String[]{"ConfChangeAuxService"}); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, - "ConfChangeAuxService"), ConfChangeAuxService.class, Service.class); + if (useManifest) { + AuxServiceRecord service = + AuxServices.newAuxService("ConfChangeAuxService", + ConfChangeAuxService.class.getName()); + service.getConfiguration().setProperty("dummyConfig", "testValue"); + writeManifestFile(new AuxServiceRecords().serviceList(service), conf); + } else { + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[]{"ConfChangeAuxService"}); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, + "ConfChangeAuxService"), ConfChangeAuxService.class, Service.class); + conf.set("dummyConfig", "testValue"); + } AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT, MOCK_DEL_SERVICE); - conf.set("dummyConfig", "testValue"); aux.init(conf); aux.start(); for (AuxiliaryService s : aux.getServices()) { assertEquals(STARTED, s.getServiceState()); - assertEquals(conf.get("dummyConfig"), "testValue"); + if (useManifest) { + assertNull(conf.get("dummyConfig")); + } else { + assertEquals("testValue", conf.get("dummyConfig")); + } + assertEquals("changedTestValue", s.getConfig().get("dummyConfig")); } aux.stop();