SUBMARINE-40. Add TonY runtime to Submarine. Contributed by Keqiu Hu.

This commit is contained in:
Zhankun Tang 2019-04-23 15:45:42 +08:00
parent 1c8046d67e
commit e79a9c12c1
19 changed files with 955 additions and 6 deletions

View File

@ -38,6 +38,7 @@ public class CliConstants {
public static final String TENSORBOARD_RESOURCES = "tensorboard_resources"; public static final String TENSORBOARD_RESOURCES = "tensorboard_resources";
public static final String TENSORBOARD_DEFAULT_RESOURCES = public static final String TENSORBOARD_DEFAULT_RESOURCES =
"memory=4G,vcores=1"; "memory=4G,vcores=1";
public static final String ARG_CONF = "conf";
public static final String WORKER_LAUNCH_CMD = "worker_launch_cmd"; public static final String WORKER_LAUNCH_CMD = "worker_launch_cmd";
public static final String SERVING_LAUNCH_CMD = "serving_launch_cmd"; public static final String SERVING_LAUNCH_CMD = "serving_launch_cmd";
@ -57,4 +58,5 @@ public class CliConstants {
public static final String PRINCIPAL = "principal"; public static final String PRINCIPAL = "principal";
public static final String DISTRIBUTE_KEYTAB = "distribute_keytab"; public static final String DISTRIBUTE_KEYTAB = "distribute_keytab";
public static final String YAML_CONFIG = "f"; public static final String YAML_CONFIG = "f";
public static final String INSECURE_CLUSTER = "insecure";
} }

View File

@ -166,6 +166,9 @@ private Options generateOptions() {
" parameter" + CliConstants.KEYTAB + " on cluster machines will be " + " parameter" + CliConstants.KEYTAB + " on cluster machines will be " +
"used"); "used");
options.addOption("h", "help", false, "Print help"); options.addOption("h", "help", false, "Print help");
options.addOption("insecure", false, "Cluster is not Kerberos enabled.");
options.addOption("conf", true,
"User specified configuration, as key=val pairs.");
return options; return options;
} }

View File

@ -56,9 +56,11 @@ public class RunJobParameters extends RunParameters {
private boolean waitJobFinish = false; private boolean waitJobFinish = false;
private boolean distributed = false; private boolean distributed = false;
private boolean securityDisabled = false;
private String keytab; private String keytab;
private String principal; private String principal;
private boolean distributeKeytab = false; private boolean distributeKeytab = false;
private List<String> confPairs = new ArrayList<>();
@Override @Override
public void updateParameters(ParametersHolder parametersHolder, public void updateParameters(ParametersHolder parametersHolder,
@ -97,6 +99,10 @@ public void updateParameters(ParametersHolder parametersHolder,
+ "please double check."); + "please double check.");
} }
if (parametersHolder.hasOption(CliConstants.INSECURE_CLUSTER)) {
setSecurityDisabled(true);
}
String kerberosKeytab = parametersHolder.getOptionValue( String kerberosKeytab = parametersHolder.getOptionValue(
CliConstants.KEYTAB); CliConstants.KEYTAB);
String kerberosPrincipal = parametersHolder.getOptionValue( String kerberosPrincipal = parametersHolder.getOptionValue(
@ -181,6 +187,9 @@ public void updateParameters(ParametersHolder parametersHolder,
boolean distributeKerberosKeytab = parametersHolder.hasOption(CliConstants boolean distributeKerberosKeytab = parametersHolder.hasOption(CliConstants
.DISTRIBUTE_KEYTAB); .DISTRIBUTE_KEYTAB);
List<String> configPairs = parametersHolder
.getOptionValues(CliConstants.ARG_CONF);
this.setInputPath(input).setCheckpointPath(jobDir) this.setInputPath(input).setCheckpointPath(jobDir)
.setNumPS(nPS).setNumWorkers(nWorkers) .setNumPS(nPS).setNumWorkers(nWorkers)
.setPSLaunchCmd(psLaunchCommand).setWorkerLaunchCmd(workerLaunchCmd) .setPSLaunchCmd(psLaunchCommand).setWorkerLaunchCmd(workerLaunchCmd)
@ -188,7 +197,8 @@ public void updateParameters(ParametersHolder parametersHolder,
.setTensorboardEnabled(tensorboard) .setTensorboardEnabled(tensorboard)
.setKeytab(kerberosKeytab) .setKeytab(kerberosKeytab)
.setPrincipal(kerberosPrincipal) .setPrincipal(kerberosPrincipal)
.setDistributeKeytab(distributeKerberosKeytab); .setDistributeKeytab(distributeKerberosKeytab)
.setConfPairs(configPairs);
super.updateParameters(parametersHolder, clientContext); super.updateParameters(parametersHolder, clientContext);
} }
@ -329,6 +339,14 @@ public RunJobParameters setPrincipal(String kerberosPrincipal) {
return this; return this;
} }
public boolean isSecurityDisabled() {
return securityDisabled;
}
public void setSecurityDisabled(boolean securityDisabled) {
this.securityDisabled = securityDisabled;
}
public boolean isDistributeKeytab() { public boolean isDistributeKeytab() {
return distributeKeytab; return distributeKeytab;
} }
@ -339,6 +357,15 @@ public RunJobParameters setDistributeKeytab(
return this; return this;
} }
public List<String> getConfPairs() {
return confPairs;
}
public RunJobParameters setConfPairs(List<String> confPairs) {
this.confPairs = confPairs;
return this;
}
@VisibleForTesting @VisibleForTesting
public static class UnderscoreConverterPropertyUtils extends PropertyUtils { public static class UnderscoreConverterPropertyUtils extends PropertyUtils {
@Override @Override

View File

@ -91,7 +91,7 @@ usage: job run
#### Notes: #### Notes:
When using `localization` option to make a collection of dependency Python When using `localization` option to make a collection of dependency Python
scripts available to entry python script in the container, you may also need to scripts available to entry python script in the container, you may also need to
set `PYTHONPATH` environment variable as below to avoid module import error set `PYTHONPATH` environment variable as below to avoid module import error
reported from `entry_script.py`. reported from `entry_script.py`.
``` ```

View File

@ -0,0 +1,25 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
# Overview
[TonY](https://github.com/linkedin/TonY/) is an open source project that TonY
is a framework to natively run deep learning frameworks on Apache Hadoop.
As Submarine supports multiple runtimes, this module serves as an adaptor so
that Submarine could leverage TonY as a Runtime implementation to run
TensorFlow with 2.x version of Hadoop and Hadoop installations w/o docker
support or native service.
Please jump to [QuickStart](src/site/markdown/QuickStart.md) to understand how
to run Submarine with TonY runtime.

View File

@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hadoop-submarine</artifactId>
<groupId>org.apache.hadoop</groupId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hadoop-submarine-tony-runtime</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-submarine-core</artifactId>
<version>0.2.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.linkedin.tony</groupId>
<artifactId>tony-core</artifactId>
<version>0.3.3</version>
<exclusions>
<exclusion>
<groupId>com.linkedin.tony</groupId>
<artifactId>tony-mini</artifactId>
</exclusion>
<exclusion>
<groupId>com.linkedin.azkaban</groupId>
<artifactId>az-hadoop-jobtype-plugin</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-submarine-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
<version>0.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,52 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.yarn.submarine.runtimes.tony;
import com.linkedin.tony.TonyClient;
import com.linkedin.tony.client.TaskUpdateListener;
import com.linkedin.tony.rpc.TaskInfo;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.submarine.common.ClientContext;
import org.apache.hadoop.yarn.submarine.common.api.JobStatus;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
import org.apache.hadoop.yarn.submarine.runtimes.tony.buider.JobStatusBuilder;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
/**
* An implementation of JobMonitor with TonY library.
*/
public class TonyJobMonitor extends JobMonitor implements TaskUpdateListener {
private Set<TaskInfo> taskInfos = new HashSet<>();
public TonyJobMonitor(ClientContext clientContext, TonyClient client) {
super(clientContext);
client.addListener(this);
}
@Override
public JobStatus getTrainingJobStatus(String jobName)
throws IOException, YarnException {
JobStatus jobStatus = JobStatusBuilder.fromTaskInfoSet(taskInfos);
jobStatus.setJobName(jobName);
return jobStatus;
}
@Override
public void onTaskInfosUpdated(Set<TaskInfo> taskInfoSet) {
this.taskInfos = taskInfoSet;
}
}

View File

@ -0,0 +1,97 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.yarn.submarine.runtimes.tony;
import com.linkedin.tony.Constants;
import com.linkedin.tony.TonyClient;
import com.linkedin.tony.client.CallbackHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
/**
* Implementation of JobSumitter with TonY runtime.
*/
public class TonyJobSubmitter implements JobSubmitter, CallbackHandler {
private static final Log LOG = LogFactory.getLog(TonyJobSubmitter.class);
private ApplicationId applicationId;
private TonyClient tonyClient;
public TonyJobSubmitter() { }
public void setTonyClient(TonyClient client) {
this.tonyClient = client;
}
@Override
public ApplicationId submitJob(RunJobParameters parameters)
throws IOException, YarnException {
LOG.info("Starting Tony runtime..");
File tonyFinalConfPath = File.createTempFile("temp",
Constants.TONY_FINAL_XML);
// Write user's overridden conf to an xml to be localized.
Configuration tonyConf = TonyUtils.tonyConfFromClientContext(parameters);
try (OutputStream os = new FileOutputStream(tonyFinalConfPath)) {
tonyConf.writeXml(os);
} catch (IOException e) {
throw new RuntimeException("Failed to create " + tonyFinalConfPath
+ " conf file. Exiting.", e);
}
try {
tonyClient.init(new String[]{
"--conf_file", tonyFinalConfPath.getAbsolutePath()
});
} catch (Exception e) {
LOG.error("Failed to init TonyClient: ", e);
}
Thread clientThread = new Thread(tonyClient::start);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
tonyClient.forceKillApplication();
} catch (YarnException | IOException e) {
LOG.error("Failed to kill application during shutdown.", e);
}
}));
clientThread.start();
while (clientThread.isAlive()) {
if (applicationId != null) {
LOG.info("TonyClient returned applicationId: " + applicationId);
return applicationId;
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
LOG.error(e);
}
}
return null;
}
@Override
public void onApplicationIdReceived(ApplicationId appId) {
applicationId = appId;
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.yarn.submarine.runtimes.tony;
import com.linkedin.tony.TonyClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.submarine.common.ClientContext;
import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
import org.apache.hadoop.yarn.submarine.runtimes.common.FSBasedSubmarineStorageImpl;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
/**
* Implementation of RuntimeFactory with Tony Runtime
*/
public class TonyRuntimeFactory extends RuntimeFactory {
private TonyClient tonyClient;
private TonyJobSubmitter submitter;
private TonyJobMonitor monitor;
public TonyRuntimeFactory(ClientContext clientContext) {
super(clientContext);
submitter = new TonyJobSubmitter();
tonyClient = new TonyClient(submitter, new Configuration());
monitor = new TonyJobMonitor(clientContext, tonyClient);
submitter.setTonyClient(tonyClient);
}
@Override
protected JobSubmitter internalCreateJobSubmitter() {
return submitter;
}
@Override
protected JobMonitor internalCreateJobMonitor() {
return monitor;
}
@Override
protected SubmarineStorage internalCreateSubmarineStorage() {
return new FSBasedSubmarineStorageImpl(super.clientContext);
}
}

View File

@ -0,0 +1,164 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.yarn.submarine.runtimes.tony;
import com.linkedin.tony.Constants;
import com.linkedin.tony.TonyConfigurationKeys;
import com.linkedin.tony.util.Utils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* Utilities for Tony Runtime.
*/
public final class TonyUtils {
private static final Log LOG = LogFactory.getLog(TonyUtils.class);
public static Configuration tonyConfFromClientContext(
RunJobParameters parameters) {
Configuration tonyConf = new Configuration();
tonyConf.setInt(
TonyConfigurationKeys.getInstancesKey(Constants.WORKER_JOB_NAME),
parameters.getNumWorkers());
tonyConf.setInt(
TonyConfigurationKeys.getInstancesKey(Constants.PS_JOB_NAME),
parameters.getNumPS());
// Resources for PS & Worker
if (parameters.getPsResource() != null) {
tonyConf.setInt(
TonyConfigurationKeys.getResourceKey(Constants.PS_JOB_NAME,
Constants.VCORES),
parameters.getPsResource().getVirtualCores());
tonyConf.setLong(
TonyConfigurationKeys.getResourceKey(Constants.PS_JOB_NAME,
Constants.MEMORY),
parameters.getPsResource().getMemorySize());
}
if (parameters.getWorkerResource() != null) {
tonyConf.setInt(
TonyConfigurationKeys.getResourceKey(Constants.WORKER_JOB_NAME,
Constants.VCORES),
parameters.getWorkerResource().getVirtualCores());
tonyConf.setLong(
TonyConfigurationKeys.getResourceKey(Constants.WORKER_JOB_NAME,
Constants.MEMORY),
parameters.getWorkerResource().getMemorySize());
try {
tonyConf.setLong(
TonyConfigurationKeys.getResourceKey(Constants.WORKER_JOB_NAME,
Constants.GPUS),
parameters.getWorkerResource()
.getResourceValue(ResourceInformation.GPU_URI));
} catch (ResourceNotFoundException rnfe) {
LOG.error("GPU resources not enabled.");
}
}
if (parameters.getQueue() != null) {
tonyConf.set(
TonyConfigurationKeys.YARN_QUEUE_NAME,
parameters.getQueue());
}
// Set up Docker for PS & Worker
if (parameters.getDockerImageName() != null) {
tonyConf.set(TonyConfigurationKeys.getContainerDockerKey(),
parameters.getDockerImageName());
tonyConf.setBoolean(TonyConfigurationKeys.DOCKER_ENABLED, true);
}
if (parameters.getWorkerDockerImage() != null) {
tonyConf.set(
TonyConfigurationKeys.getDockerImageKey(Constants.WORKER_JOB_NAME),
parameters.getWorkerDockerImage());
tonyConf.setBoolean(TonyConfigurationKeys.DOCKER_ENABLED, true);
}
if (parameters.getPsDockerImage() != null) {
tonyConf.set(
TonyConfigurationKeys.getDockerImageKey(Constants.PS_JOB_NAME),
parameters.getPsDockerImage());
tonyConf.setBoolean(TonyConfigurationKeys.DOCKER_ENABLED, true);
}
// Set up container environment
List<String> envs = parameters.getEnvars();
tonyConf.setStrings(
TonyConfigurationKeys.CONTAINER_LAUNCH_ENV,
envs.toArray(new String[0]));
tonyConf.setStrings(TonyConfigurationKeys.EXECUTION_ENV,
envs.stream()
.map(env -> env.replaceAll("DOCKER_", ""))
.toArray(String[]::new));
tonyConf.setStrings(TonyConfigurationKeys.CONTAINER_LAUNCH_ENV,
envs.stream().map(env -> env.replaceAll("DOCKER_", ""))
.toArray(String[]::new));
// Set up running command
if (parameters.getWorkerLaunchCmd() != null) {
tonyConf.set(
TonyConfigurationKeys.getExecuteCommandKey(Constants.WORKER_JOB_NAME),
parameters.getWorkerLaunchCmd());
}
if (parameters.getPSLaunchCmd() != null) {
tonyConf.set(
TonyConfigurationKeys.getExecuteCommandKey(Constants.PS_JOB_NAME),
parameters.getPSLaunchCmd());
}
tonyConf.setBoolean(TonyConfigurationKeys.SECURITY_ENABLED,
!parameters.isSecurityDisabled());
// Set up container resources
if (parameters.getLocalizations() != null) {
tonyConf.setStrings(TonyConfigurationKeys.getContainerResourcesKey(),
parameters.getLocalizations().stream()
.map(lo -> lo.getRemoteUri() + Constants.RESOURCE_DIVIDER
+ lo.getLocalPath())
.toArray(String[]::new));
}
if (parameters.getConfPairs() != null) {
String[] confArray = parameters.getConfPairs().toArray(new String[0]);
for (Map.Entry<String, String> cliConf : Utils
.parseKeyValue(confArray).entrySet()) {
String[] existingValue = tonyConf.getStrings(cliConf.getKey());
if (existingValue != null
&& TonyConfigurationKeys
.MULTI_VALUE_CONF.contains(cliConf.getKey())) {
ArrayList<String> newValues = new ArrayList<>(Arrays
.asList(existingValue));
newValues.add(cliConf.getValue());
tonyConf.setStrings(cliConf.getKey(),
newValues.toArray(new String[0]));
} else {
tonyConf.set(cliConf.getKey(), cliConf.getValue());
}
}
}
LOG.info("Resources: " + tonyConf.get(
TonyConfigurationKeys.getContainerResourcesKey()));
return tonyConf;
}
private TonyUtils() {
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.yarn.submarine.runtimes.tony.buider;
import com.linkedin.tony.rpc.TaskInfo;
import org.apache.hadoop.yarn.submarine.common.api.JobComponentStatus;
import org.apache.hadoop.yarn.submarine.common.api.JobStatus;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* JobStatusBuilder builds the job status from a set of TaskInfos.
*/
public final class JobStatusBuilder {
public static JobStatus fromTaskInfoSet(final Set<TaskInfo> taskInfos) {
JobStatus status = new JobStatus();
Set<String> jobNames =
taskInfos.stream().map(TaskInfo::getName).collect(Collectors.toSet());
List<JobComponentStatus> jobComponentStatusList = new ArrayList<>();
for (String jobName : jobNames) {
Set<TaskInfo> filterTasks = taskInfos.stream()
.filter(taskInfo -> taskInfo.getName().equals(jobName))
.collect(Collectors.toSet());
long numReadyContainers = 0;
long numRunningContainers = 0;
long totalAskedContainers = 0;
for (TaskInfo taskInfo : filterTasks) {
totalAskedContainers += 1;
switch (taskInfo.getStatus()) {
case READY:
numReadyContainers += 1;
break;
case RUNNING:
numRunningContainers += 1;
break;
default:
}
}
jobComponentStatusList.add(new JobComponentStatus(jobName,
numReadyContainers, numRunningContainers, totalAskedContainers));
}
status.setComponentStatus(jobComponentStatusList);
return status;
}
private JobStatusBuilder() { }
}

View File

@ -0,0 +1,14 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.yarn.submarine.runtimes.tony.buider;

View File

@ -0,0 +1,14 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.yarn.submarine.runtimes.tony;

View File

@ -0,0 +1,198 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
# Quick Start Guide
## Prerequisite
Must:
- Apache Hadoop 2.7 or above.
Optional:
- Enable GPU on YARN support (when GPU-based training is required, Hadoop 3.1 and above).
- Enable Docker support on Hadoop (Hadoop 2.9 and above).
## Run jobs
### Commandline options
```$xslt
usage:
-docker_image <arg> Docker image name/tag
-env <arg> Common environment variable of worker/ps
-name <arg> Name of the job
-num_ps <arg> Number of PS tasks of the job, by default
it's 0
-num_workers <arg> Numnber of worker tasks of the job, by
default it's 1
-ps_docker_image <arg> Specify docker image for PS, when this is
not specified, PS uses --docker_image as
default.
-ps_launch_cmd <arg> Commandline of worker, arguments will be
directly used to launch the PS
-ps_resources <arg> Resource of each PS, for example
memory-mb=2048,vcores=2,yarn.io/gpu=2
-queue <arg> Name of queue to run the job, by default it
uses default queue
-saved_model_path <arg> Model exported path (savedmodel) of the job,
which is needed when exported model is not
placed under ${checkpoint_path}could be
local or other FS directory. This will be
used to serve.
-tensorboard <arg> Should we run TensorBoard for this job? By
default it's true
-verbose Print verbose log for troubleshooting
-wait_job_finish Specified when user want to wait the job
finish
-worker_docker_image <arg> Specify docker image for WORKER, when this
is not specified, WORKER uses --docker_image
as default.
-worker_launch_cmd <arg> Commandline of worker, arguments will be
directly used to launch the worker
-worker_resources <arg> Resource of each worker, for example
memory-mb=2048,vcores=2,yarn.io/gpu=2
-localization <arg> Specify localization to remote/local
file/directory available to all container(Docker).
Argument format is "RemoteUri:LocalFilePath[:rw]"
(ro permission is not supported yet).
The RemoteUri can be a file or directory in local
or HDFS or s3 or abfs or http .etc.
The LocalFilePath can be absolute or relative.
If relative, it'll be under container's implied
working directory.
This option can be set mutiple times.
Examples are
-localization "hdfs:///user/yarn/mydir2:/opt/data"
-localization "s3a:///a/b/myfile1:./"
-localization "https:///a/b/myfile2:./myfile"
-localization "/user/yarn/mydir3:/opt/mydir3"
-localization "./mydir1:."
-insecure Whether running in an insecure cluster
-conf Override configurations via commandline
```
### Submarine Configuration
For submarine internal configuration, please create a `submarine.xml` which should be placed under `$HADOOP_CONF_DIR`.
Make sure you set `submarine.runtime.class` to `org.apache.hadoop.yarn.submarine.runtimes.tony.TonyRuntimeFactory`
|Configuration Name | Description |
|:---- |:---- |
| `submarine.runtime.class` | org.apache.hadoop.yarn.submarine.runtimes.tony.TonyRuntimeFactory
| `submarine.localization.max-allowed-file-size-mb` | Optional. This sets a size limit to the file/directory to be localized in "-localization" CLI option. 2GB by default. |
### Launch TensorFlow Application:
#### Commandline
### Without Docker
You need:
* Build a Python virtual environment with TensorFlow 1.13.1 installed
* A cluster with Hadoop 2.7 or above.
### Building a Python virtual environment with TensorFlow
TonY requires a Python virtual environment zip with TensorFlow and any needed Python libraries already installed.
```
wget https://files.pythonhosted.org/packages/33/bc/fa0b5347139cd9564f0d44ebd2b147ac97c36b2403943dbee8a25fd74012/virtualenv-16.0.0.tar.gz
tar xf virtualenv-16.0.0.tar.gz
# Make sure to install using Python 3, as TensorFlow only provides Python 3 artifacts
python virtualenv-16.0.0/virtualenv.py venv
. venv/bin/activate
pip install tensorflow==1.13.1
zip -r venv.zip venv
```
### TensorFlow version
- Version 1.13.1
**Note:** If you require a past version of TensorFlow and TensorBoard, take a look at [this](https://github.com/linkedin/TonY/issues/42) issue.
### Installing Hadoop
TonY only requires YARN, not HDFS. Please see the [open-source documentation](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html) on how to set YARN up.
### Get the training examples
Get mnist_distributed.py from https://github.com/linkedin/TonY/tree/master/tony-examples/mnist-tensorflow
```
CLASSPATH=$(hadoop classpath --glob): \
./hadoop-submarine-core/target/hadoop-submarine-core-0.2.0-SNAPSHOT.jar: \
./hadoop-submarine-yarnservice-runtime/target/hadoop-submarine-score-yarnservice-runtime-0.2.0-SNAPSHOT.jar: \
./hadoop-submarine-tony-runtime/target/hadoop-submarine-tony-runtime-0.2.0-SNAPSHOT.jar: \
/home/pi/hadoop/TonY/tony-cli/build/libs/tony-cli-0.3.2-all.jar \
java org.apache.hadoop.yarn.submarine.client.cli.Cli job run --name tf-job-001 \
--num_workers 2 \
--worker_resources memory=3G,vcores=2 \
--num_ps 2 \
--ps_resources memory=3G,vcores=2 \
--worker_launch_cmd "venv.zip/venv/bin/python --steps 1000 --data_dir /tmp/data --working_dir /tmp/mode" \
--ps_launch_cmd "venv.zip/venv/bin/python --steps 1000 --data_dir /tmp/data --working_dir /tmp/mode" \
--container_resources /home/pi/hadoop/TonY/tony-cli/build/libs/tony-cli-0.3.2-all.jar
--insecure
--conf tony.containers.resources=PATH_TO_VENV_YOU_CREATED/venv.zip#archive,PATH_TO_MNIST_EXAMPLE/mnist_distributed.py, \
PATH_TO_TONY_CLI_JAR/tony-cli-0.3.2-all.jar
```
You should then be able to see links and status of the jobs from command line:
```
2019-04-22 20:30:42,611 INFO tony.TonyClient: Tasks Status Updated: [TaskInfo] name: worker index: 0 url: http://pi-aw:8042/node/containerlogs/container_1555916523933_0030_01_000003/pi status: RUNNING
2019-04-22 20:30:42,612 INFO tony.TonyClient: Tasks Status Updated: [TaskInfo] name: worker index: 1 url: http://pi-aw:8042/node/containerlogs/container_1555916523933_0030_01_000004/pi status: RUNNING
2019-04-22 20:30:42,612 INFO tony.TonyClient: Tasks Status Updated: [TaskInfo] name: ps index: 0 url: http://pi-aw:8042/node/containerlogs/container_1555916523933_0030_01_000002/pi status: RUNNING
2019-04-22 20:30:42,612 INFO tony.TonyClient: Logs for ps 0 at: http://pi-aw:8042/node/containerlogs/container_1555916523933_0030_01_000002/pi
2019-04-22 20:30:42,612 INFO tony.TonyClient: Logs for worker 0 at: http://pi-aw:8042/node/containerlogs/container_1555916523933_0030_01_000003/pi
2019-04-22 20:30:42,612 INFO tony.TonyClient: Logs for worker 1 at: http://pi-aw:8042/node/containerlogs/container_1555916523933_0030_01_000004/pi
2019-04-22 20:30:44,625 INFO tony.TonyClient: Tasks Status Updated: [TaskInfo] name: ps index: 0 url: http://pi-aw:8042/node/containerlogs/container_1555916523933_0030_01_000002/pi status: FINISHED
2019-04-22 20:30:44,625 INFO tony.TonyClient: Tasks Status Updated: [TaskInfo] name: worker index: 0 url: http://pi-aw:8042/node/containerlogs/container_1555916523933_0030_01_000003/pi status: FINISHED
2019-04-22 20:30:44,626 INFO tony.TonyClient: Tasks Status Updated: [TaskInfo] name: worker index: 1 url: http://pi-aw:8042/node/containerlogs/container_1555916523933_0030_01_000004/pi status: FINISHED
```
### With Docker
```
CLASSPATH=$(hadoop classpath --glob): \
./hadoop-submarine-core/target/hadoop-submarine-core-0.2.0-SNAPSHOT.jar: \
./hadoop-submarine-yarnservice-runtime/target/hadoop-submarine-score-yarnservice-runtime-0.2.0-SNAPSHOT.jar: \
./hadoop-submarine-tony-runtime/target/hadoop-submarine-tony-runtime-0.2.0-SNAPSHOT.jar: \
/home/pi/hadoop/TonY/tony-cli/build/libs/tony-cli-0.3.2-all.jar \
java org.apache.hadoop.yarn.submarine.client.cli.Cli job run --name tf-job-001 \
--docker_image hadoopsubmarine/tf-1.8.0-cpu:0.0.3 \
--input_path hdfs://pi-aw:9000/dataset/cifar-10-data \
--worker_resources memory=3G,vcores=2 \
--worker_launch_cmd "export CLASSPATH=\$(/hadoop-3.1.0/bin/hadoop classpath --glob) && cd /test/models/tutorials/image/cifar10_estimator && python cifar10_main.py --data-dir=%input_path% --job-dir=%checkpoint_path% --train-steps=10000 --eval-batch-size=16 --train-batch-size=16 --variable-strategy=CPU --num-gpus=0 --sync" \
--env JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 \
--env DOCKER_HADOOP_HDFS_HOME=/hadoop-3.1.0 \
--env DOCKER_JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 \
--env HADOOP_HOME=/hadoop-3.1.0 \
--env HADOOP_YARN_HOME=/hadoop-3.1.0 \
--env HADOOP_COMMON_HOME=/hadoop-3.1.0 \
--env HADOOP_HDFS_HOME=/hadoop-3.1.0 \
--env HADOOP_CONF_DIR=/hadoop-3.1.0/etc/hadoop \
--conf tony.containers.resources=--conf tony.containers.resources=/home/pi/hadoop/TonY/tony-cli/build/libs/tony-cli-0.3.2-all.jar
```

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#banner {
height: 93px;
background: none;
}
#bannerLeft img {
margin-left: 30px;
margin-top: 10px;
}
#bannerRight img {
margin: 17px;
}

View File

@ -0,0 +1,28 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project name="Apache Hadoop ${project.version}">
<skin>
<groupId>org.apache.maven.skins</groupId>
<artifactId>maven-stylus-skin</artifactId>
<version>${maven-stylus-skin.version}</version>
</skin>
<body>
<links>
<item name="Apache Hadoop" href="http://hadoop.apache.org/"/>
</links>
</body>
</project>

View File

@ -0,0 +1,113 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
import com.linkedin.tony.Constants;
import com.linkedin.tony.TonyConfigurationKeys;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.submarine.client.cli.RunJobCli;
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
import org.apache.hadoop.yarn.submarine.common.MockClientContext;
import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
import org.apache.hadoop.yarn.submarine.runtimes.tony.TonyUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class TestTonyUtils {
private MockClientContext getMockClientContext()
throws IOException, YarnException {
MockClientContext mockClientContext = new MockClientContext();
JobSubmitter mockJobSubmitter = mock(JobSubmitter.class);
when(mockJobSubmitter.submitJob(any(RunJobParameters.class))).thenReturn(
ApplicationId.newInstance(1234L, 1));
JobMonitor mockJobMonitor = mock(JobMonitor.class);
SubmarineStorage storage = mock(SubmarineStorage.class);
RuntimeFactory rtFactory = mock(RuntimeFactory.class);
when(rtFactory.getJobSubmitterInstance()).thenReturn(mockJobSubmitter);
when(rtFactory.getJobMonitorInstance()).thenReturn(mockJobMonitor);
when(rtFactory.getSubmarineStorage()).thenReturn(storage);
mockClientContext.setRuntimeFactory(rtFactory);
return mockClientContext;
}
@Before
public void before() {
SubmarineLogs.verboseOff();
}
@Test
public void testTonyConfFromClientContext() throws Exception {
RunJobCli runJobCli = new RunJobCli(getMockClientContext());
runJobCli.run(
new String[] {"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
"--input_path", "hdfs://input",
"--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
"python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
"--ps_resources", "memory=4G,vcores=4", "--ps_launch_cmd",
"python run-ps.py"});
RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
Configuration tonyConf = TonyUtils
.tonyConfFromClientContext(jobRunParameters);
Assert.assertEquals(jobRunParameters.getDockerImageName(),
tonyConf.get(TonyConfigurationKeys.getContainerDockerKey()));
Assert.assertEquals("3", tonyConf.get(TonyConfigurationKeys
.getInstancesKey("worker")));
Assert.assertEquals(jobRunParameters.getWorkerLaunchCmd(),
tonyConf.get(TonyConfigurationKeys
.getExecuteCommandKey("worker")));
Assert.assertEquals("2048", tonyConf.get(TonyConfigurationKeys
.getResourceKey(Constants.WORKER_JOB_NAME, Constants.MEMORY)));
Assert.assertEquals("2", tonyConf.get(TonyConfigurationKeys
.getResourceKey(Constants.WORKER_JOB_NAME, Constants.VCORES)));
Assert.assertEquals("4096", tonyConf.get(TonyConfigurationKeys
.getResourceKey(Constants.PS_JOB_NAME, Constants.MEMORY)));
Assert.assertEquals("4", tonyConf.get(TonyConfigurationKeys
.getResourceKey(Constants.PS_JOB_NAME,
Constants.VCORES)));
Assert.assertEquals(jobRunParameters.getPSLaunchCmd(),
tonyConf.get(TonyConfigurationKeys.getExecuteCommandKey("ps")));
}
}

View File

@ -110,7 +110,7 @@ private Resource getServiceResourceFromYarnResource(
return serviceResource; return serviceResource;
} }
private String getValueOfEnvionment(String envar) { private String getValueOfEnvironment(String envar) {
// extract value from "key=value" form // extract value from "key=value" form
if (envar == null || !envar.contains("=")) { if (envar == null || !envar.contains("=")) {
return ""; return "";
@ -133,10 +133,10 @@ private void addHdfsClassPathIfNeeded(RunJobParameters parameters,
for (String envar : parameters.getEnvars()) { for (String envar : parameters.getEnvars()) {
if (envar.startsWith("DOCKER_HADOOP_HDFS_HOME=")) { if (envar.startsWith("DOCKER_HADOOP_HDFS_HOME=")) {
hdfsHome = getValueOfEnvionment(envar); hdfsHome = getValueOfEnvironment(envar);
hadoopEnv = true; hadoopEnv = true;
} else if (envar.startsWith("DOCKER_JAVA_HOME=")) { } else if (envar.startsWith("DOCKER_JAVA_HOME=")) {
javaHome = getValueOfEnvionment(envar); javaHome = getValueOfEnvironment(envar);
} }
} }

View File

@ -37,6 +37,7 @@
<modules> <modules>
<module>hadoop-submarine-core</module> <module>hadoop-submarine-core</module>
<module>hadoop-submarine-yarnservice-runtime</module> <module>hadoop-submarine-yarnservice-runtime</module>
<module>hadoop-submarine-tony-runtime</module>
</modules> </modules>
<profiles> <profiles>