From 8a95ea61e12384389f2103df0fcba594469cc024 Mon Sep 17 00:00:00 2001 From: Zhankun Tang Date: Tue, 23 Apr 2019 17:33:58 +0800 Subject: [PATCH] YARN-9475. [YARN-9473] Create basic VE plugin. Contributed by Peter Bacsko. --- .../resourceplugin/com/nec/NECVEPlugin.java | 306 ++++++++++++++++++ .../resourceplugin/com/nec/package-info.java | 19 ++ 2 files changed, 325 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/NECVEPlugin.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/NECVEPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/NECVEPlugin.java new file mode 100644 index 0000000000..d22623766a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/NECVEPlugin.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.com.nec; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.Shell.CommandExecutor; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRuntimeSpec; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.YarnRuntimeType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A device framework plugin which supports NEC Vector Engine. + * + */ +public class NECVEPlugin implements DevicePlugin, DevicePluginScheduler { + private static final String HADOOP_COMMON_HOME = "HADOOP_COMMON_HOME"; + private static final String ENV_SCRIPT_PATH = "NEC_VE_GET_SCRIPT_PATH"; + private static final String ENV_SCRIPT_NAME = "NEC_VE_GET_SCRIPT_NAME"; + private static final String DEFAULT_SCRIPT_NAME = "nec-ve-get.py"; + private static final Logger LOG = LoggerFactory.getLogger(NECVEPlugin.class); + private static final String[] DEFAULT_BINARY_SEARCH_DIRS = new String[]{ + "/usr/bin", "/bin", "/opt/nec/ve/bin"}; + + private String binaryPath; + + private Function + commandExecutorProvider = this::createCommandExecutor; + + public NECVEPlugin() throws ResourceHandlerException { + this(System::getenv, DEFAULT_BINARY_SEARCH_DIRS); + } + + @VisibleForTesting + NECVEPlugin(Function envProvider, String[] scriptPaths) + throws ResourceHandlerException { + String binaryName = DEFAULT_SCRIPT_NAME; + + String envScriptName = envProvider.apply(ENV_SCRIPT_NAME); + if (envScriptName != null) { + binaryName = envScriptName; + } + LOG.info("Use {} as script name.", envScriptName); + + // Try to find the script based on an environment variable, if set + boolean found = false; + String envBinaryPath = envProvider.apply(ENV_SCRIPT_PATH); + if (envBinaryPath != null) { + this.binaryPath = getScriptFromEnvSetting(envBinaryPath); + found = binaryPath != null; + } + + // Try $HADOOP_COMMON_HOME + if (!found) { + // print a warning only if the env variable was defined + if (envBinaryPath != null) { + LOG.warn("Script {} does not exist, falling back " + + "to $HADOOP_COMMON_HOME/sbin/DevicePluginScript/", envBinaryPath); + } + + this.binaryPath = getScriptFromHadoopCommon(envProvider, binaryName); + found = binaryPath != null; + } + + // Try the default search directories + if (!found) { + LOG.info("Script not found under" + + " $HADOOP_COMMON_HOME/sbin/DevicePluginScript/," + + " falling back to default search directories"); + + this.binaryPath = getScriptFromSearchDirs(binaryName, scriptPaths); + found = binaryPath != null; + } + + // Script not found + if (!found) { + LOG.error("Script not found in " + + Arrays.toString(scriptPaths)); + throw new ResourceHandlerException( + "No binary found for " + NECVEPlugin.class.getName()); + } + } + + public DeviceRegisterRequest getRegisterRequestInfo() { + return DeviceRegisterRequest.Builder.newInstance() + .setResourceName("nec.com/ve").build(); + } + + public Set getDevices() { + Set devices = null; + + CommandExecutor executor = + commandExecutorProvider.apply(new String[]{this.binaryPath}); + try { + executor.execute(); + String output = executor.getOutput(); + devices = parseOutput(output); + } catch (IOException e) { + LOG.warn(e.toString()); + } + return devices; + } + + public DeviceRuntimeSpec onDevicesAllocated(Set set, + YarnRuntimeType yarnRuntimeType) { + return null; + } + + /** + * Parses the output of the external Python script. + * + * Sample line: + * id=0, dev=/dev/ve0, state=ONLINE, busId=0000:65:00.0, major=243, minor=0 + */ + private Set parseOutput(String output) { + Set devices = new HashSet<>(); + + LOG.info("Parsing output: {}", output); + String[] lines = output.split("\n"); + for (String line : lines) { + Device.Builder builder = Device.Builder.newInstance(); + + // map key --> builder calls + Map> builderInvocations = + getBuilderInvocationsMap(builder); + + String[] keyValues = line.trim().split(","); + for (String keyValue : keyValues) { + String[] tokens = keyValue.trim().split("="); + if (tokens.length != 2) { + LOG.error("Unknown format of script output! Skipping this line"); + continue; + } + + final String key = tokens[0]; + final String value = tokens[1]; + + Consumer builderInvocation = builderInvocations.get(key); + if (builderInvocation != null) { + builderInvocation.accept(value); + } else { + LOG.warn("Unknown key {}, ignored", key); + } + }// for key value pairs + Device device = builder.build(); + if (device.isHealthy()) { + devices.add(device); + } else { + LOG.warn("Skipping device {} because it's not healthy", device); + } + } + + return devices; + } + + @Override + public void onDevicesReleased(Set releasedDevices) { + // nop + } + + @Override + public Set allocateDevices(Set availableDevices, int count, + Map env) { + // Can consider topology, utilization.etc + Set allocated = new HashSet<>(); + int number = 0; + for (Device d : availableDevices) { + allocated.add(d); + number++; + if (number == count) { + break; + } + } + return allocated; + } + + private CommandExecutor createCommandExecutor(String[] command) { + return new Shell.ShellCommandExecutor( + command); + } + + private String getScriptFromEnvSetting(String envBinaryPath) { + LOG.info("Checking script path: {}", envBinaryPath); + File f = new File(envBinaryPath); + + if (!f.exists()) { + LOG.warn("Script {} does not exist", envBinaryPath); + return null; + } + + if (f.isDirectory()) { + LOG.warn("Specified path {} is a directory", envBinaryPath); + return null; + } + + if (!FileUtil.canExecute(f)) { + LOG.warn("Script {} is not executable", envBinaryPath); + return null; + } + + LOG.info("Found script: {}", envBinaryPath); + + return envBinaryPath; + } + + private String getScriptFromHadoopCommon( + Function envProvider, String binaryName) { + String scriptPath = null; + String hadoopCommon = envProvider.apply(HADOOP_COMMON_HOME); + + if (hadoopCommon != null) { + String targetPath = hadoopCommon + + "/sbin/DevicePluginScript/" + binaryName; + LOG.info("Checking script {}: ", targetPath); + if (new File(targetPath).exists()) { + LOG.info("Found script: {}", targetPath); + scriptPath = targetPath; + } + } else { + LOG.info("$HADOOP_COMMON_HOME is not set"); + } + + return scriptPath; + } + + private String getScriptFromSearchDirs(String binaryName, + String[] scriptPaths) { + String scriptPath = null; + + for (String dir : scriptPaths) { + File f = new File(dir, binaryName); + if (f.exists()) { + LOG.info("Found script: {}", dir); + scriptPath = f.getAbsolutePath(); + break; + } + } + + return scriptPath; + } + + private Map> getBuilderInvocationsMap( + Device.Builder builder) { + Map> builderInvocations = new HashMap<>(); + builderInvocations.put("id", v -> builder.setId(Integer.parseInt(v))); + builderInvocations.put("dev", v -> builder.setDevPath(v)); + builderInvocations.put("state", v -> { + if (v.equals("ONLINE")) { + builder.setHealthy(true); + } + builder.setStatus(v); + }); + builderInvocations.put("busId", v -> builder.setBusID(v)); + builderInvocations.put("major", + v -> builder.setMajorNumber(Integer.parseInt(v))); + builderInvocations.put("minor", + v -> builder.setMinorNumber(Integer.parseInt(v))); + + return builderInvocations; + } + + @VisibleForTesting + void setCommandExecutorProvider( + Function provider) { + this.commandExecutorProvider = provider; + } + + @VisibleForTesting + String getBinaryPath() { + return binaryPath; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/package-info.java new file mode 100644 index 0000000000..8f7fd676a5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.com.nec; \ No newline at end of file