YARN-8551. Project setup for MaWo application.

Contributed by Yesha Vora
This commit is contained in:
Eric Yang 2019-04-22 16:56:26 -04:00
parent fb1c549139
commit a54c1e3ace
28 changed files with 2455 additions and 0 deletions

View File

@ -1100,6 +1100,11 @@
<artifactId>mockito-core</artifactId>
<version>2.23.4</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.5</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<FindBugsFilter>
</FindBugsFilter>

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<FindBugsFilter>
</FindBugsFilter>

View File

@ -0,0 +1,173 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hadoop-applications-mawo</artifactId>
<groupId>org.apache.hadoop.applications.mawo</groupId>
<version>3.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<!-- <groupId>org.apache.hadoop.applications.mawo</groupId> -->
<artifactId>hadoop-applications-mawo-core</artifactId>
<packaging>jar</packaging>
<name>Apache Hadoop YARN Application MaWo Core</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<exclusions>
<exclusion>
<!-- otherwise seems to drag in junit 3.8.1 via jline -->
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/assembly/bin.xml</descriptor>
</descriptors>
<finalName>hadoop-applications-mawo-core-${project.version}</finalName>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>target/**/*</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<assembly>
<id>bin</id>
<formats>
<format>tar.gz</format>
</formats>
<fileSets>
<fileSet>
<includes>
<include>README*</include>
</includes>
</fileSet>
<!-- Packaging the bin scripts -->
<fileSet>
<directory>src/main/bin</directory>
<outputDirectory>bin</outputDirectory>
</fileSet>
<!-- Package the main artifcats -->
<fileSet>
<directory>target</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>*.jar</include>
</includes>
</fileSet>
</fileSets>
</assembly>

View File

@ -0,0 +1,246 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Abstract class for MaWo Task.
*/
public abstract class AbstractTask implements Task {
/**
* Task identifier.
*/
private TaskId taskID = new TaskId();
/**
* Task environment.
*/
private Map<String, String> environment = new HashMap<String, String>();
/**
* Command which need to be executed as Task.
*/
private String taskCmd;
/**
* Type of task.
*/
private TaskType taskType;
/**
* Task timeout.
*/
private long timeout;
/**
* logger for abstract class.
*/
static final Logger LOG = LoggerFactory.getLogger(AbstractTask.class);
/**
* AbstractTask constructor.
*/
public AbstractTask() {
}
/**
* AbstrackTask constructor.
* @param taskId : Task identifier
* @param localenvironment : Task environment vars
* @param taskCMD : Cmd to run
* @param localtimeout : Task timeout in seconds
*/
public AbstractTask(final TaskId taskId,
final Map<String, String> localenvironment,
final String taskCMD, final long localtimeout) {
this();
setTaskId(taskId);
setEnvironment(localenvironment);
setTaskCmd(taskCMD);
setTimeout(localtimeout);
LOG.info("Created Task - type: " + this.taskType + ", TaskId: "
+ this.taskID.toString() + ", cmd: '" + taskCMD + "' Timeout: "
+ timeout);
}
/**
* Get environment for a Task.
* @return environment of a Task
*/
@Override
public final Map<String, String> getEnvironment() {
return environment;
}
/**
* Set environment for a Task.
* @param localenvironment : Map of environment vars
*/
@Override
public final void setEnvironment(final Map<String, String> localenvironment) {
this.environment = localenvironment;
}
/**
* Get TaskCmd for a Task.
* @return TaskCMD: Its a task command line such as sleep 10
*/
@Override
public final String getTaskCmd() {
return taskCmd;
}
/**
* Set TaskCmd for a Task.
* @param taskCMD : Task command line
*/
@Override
public final void setTaskCmd(final String taskCMD) {
this.taskCmd = taskCMD;
}
/**
* Get TaskId for a Task.
* @return TaskID: Task command line
*/
@Override
public final TaskId getTaskId() {
return taskID;
}
/**
* Set Task Id.
* @param taskId : Task Identifier
*/
@Override
public final void setTaskId(final TaskId taskId) {
if (taskId != null) {
this.taskID = taskId;
}
}
/**
* Get TaskType for a Task.
* @return TaskType: Type of Task
*/
@Override
public final TaskType getTaskType() {
return taskType;
}
/**
* Set TaskType for a Task.
* @param type Simple or Composite Task
*/
public final void setTaskType(final TaskType type) {
this.taskType = type;
}
/**
* Get Timeout for a Task.
* @return timeout in seconds
*/
@Override
public final long getTimeout() {
return this.timeout;
}
/**
* Set Task Timeout in seconds.
* @param taskTimeout : Timeout in seconds
*/
@Override
public final void setTimeout(final long taskTimeout) {
this.timeout = taskTimeout;
}
/**
* Write Task.
* @param out : dataoutout object.
* @throws IOException : Throws IO exception if any error occurs.
*/
@Override
public final void write(final DataOutput out) throws IOException {
taskID.write(out);
int environmentSize = 0;
if (environment == null) {
environmentSize = 0;
} else {
environmentSize = environment.size();
}
new IntWritable(environmentSize).write(out);
if (environmentSize != 0) {
for (Entry<String, String> envEntry : environment.entrySet()) {
new Text(envEntry.getKey()).write(out);
new Text(envEntry.getValue()).write(out);
}
}
Text taskCmdText;
if (taskCmd == null) {
taskCmdText = new Text("");
} else {
taskCmdText = new Text(taskCmd);
}
taskCmdText.write(out);
WritableUtils.writeEnum(out, taskType);
WritableUtils.writeVLong(out, timeout);
}
/**
* Read Fields from file.
* @param in : datainput object.
* @throws IOException : Throws IOException in case of error.
*/
@Override
public final void readFields(final DataInput in) throws IOException {
this.taskID = new TaskId();
taskID.readFields(in);
IntWritable envSize = new IntWritable(0);
envSize.readFields(in);
for (int i = 0; i < envSize.get(); i++) {
Text key = new Text();
Text value = new Text();
key.readFields(in);
value.readFields(in);
environment.put(key.toString(), value.toString());
}
Text taskCmdText = new Text();
taskCmdText.readFields(in);
taskCmd = taskCmdText.toString();
taskType = WritableUtils.readEnum(in, TaskType.class);
timeout = WritableUtils.readVLong(in);
}
/**
* ToString.
* @return String representation of Task
*/
@Override
public final String toString() {
return "TaskId: " + this.taskID.toString() + ", TaskType: " + this.taskType
+ ", cmd: '" + taskCmd + "'";
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
/**
* Composite Task is Task with multiple commands.
*/
public class CompositeTask extends AbstractTask {
/**
* Composite Task initializer.
*/
public CompositeTask() {
super();
setTaskType(TaskType.COMPOSITE);
}
/**
* Composite initializer for specific task.
* @param task : Task object
*/
public CompositeTask(final Task task) {
super(task.getTaskId(), task.getEnvironment(), task.getTaskCmd(),
task.getTimeout());
this.setTaskType(TaskType.COMPOSITE);
}
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
/**
* Die Task is a type of task which indicates app to die.
*/
public class DieTask extends AbstractTask {
/**
* Die Task constructor.
*/
public DieTask() {
super();
setTaskType(TaskType.DIE);
}
}

View File

@ -0,0 +1,502 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Singleton;
/**
* MaWo configuration class.
*/
@Singleton
public final class MawoConfiguration {
/**
* logger initialization for mawo config class.
*/
static final Logger LOG = LoggerFactory.getLogger(MawoConfiguration.class);
/**
* Define comma separator.
*/
static final String COMMA_SPLITTER = ",";
/**
* MaWo config file name.
*/
public static final String CONFIG_FILE = "mawo.properties";
/**
* RPC server hostname.
*/
private static final String RPC_SERVER_HOSTNAME = "rpc.server.hostname";
/**
* RPC server port.
*/
private static final String RPC_SERVER_PORT = "rpc.server.port";
// Default values
/**
* RPC server hostname default value.
*/
private static final String RPC_SERVER_HOSTNAME_DEFAULT = "localhost";
/**
* RPC server port default value.
*/
private static final String RPC_SERVER_PORT_DEFAULT = "5121";
// Curator related Configurations
/**
* Config to check id Job Queue Storage is enabled.
*/
private static final String JOB_QUEUE_STORAGE_ENABLED =
"mawo.job-queue-storage.enabled";
/**
* ZooKeeper property prefix.
*/
private static final String ZK_PREFIX = "zookeeper.";
/**
* Property for ZooKeeper address.
*/
private static final String ZK_ADDRESS = ZK_PREFIX + "address";
/**
* Default value for ZooKeeper address.
*/
private static final String ZK_ADDRESS_DEFAULT = "localhost:2181";
/**
* Property for ZooKeeper parent path.
*/
private static final String ZK_PARENT_PATH = ZK_PREFIX + "parent.path";
/**
* Property for ZooKeeper parent path default value.
*/
private static final String ZK_PARENT_PATH_DEFAULT = "/mawoRoot";
/**
* Property for ZooKeeper retry interval.
*/
private static final String ZK_RETRY_INTERVAL_MS =
ZK_PREFIX + "retry.interval.ms";
/**
* Default value for ZooKeeper retry interval.
*/
private static final String ZK_RETRY_INTERVAL_MS_DEFAULT = "1000";
/**
* Property for Zookeeper session timeout.
*/
private static final String ZK_SESSION_TIMEOUT_MS =
ZK_PREFIX + "session.timeout.ms";
/**
* Default value for ZooKeeper session timeout.
*/
private static final String ZK_SESSION_TIMEOUT_MS_DEFAULT = "10000";
/**
* Property for ZooKeeper retry number.
*/
private static final String ZK_RETRIES_NUM = ZK_PREFIX + "retries.num";
/**
* Default value for ZooKeeper retry number.
*/
private static final String ZK_RETRIES_NUM_DEFAULT = "1000";
/**
* Property for ZooKeeper acl.
*/
private static final String ZK_ACL = ZK_PREFIX + "acl";
/**
* Default value for ZooKeeper acl.
*/
private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
/**
* Property for setting num of workers.
*/
private static final String WORKER_NUM_TASKS = "worker.num.tasks";
/**
* Default value for num of workers.
*/
private static final String WORKER_NUM_TASKS_DEFAULT = "10";
/**
* Property for setting job builder class.
*/
public static final String JOB_BUILDER_CLASS = "mawo.job-builder.class";
/**
* Default value for job builder class = simpleTaskJobBuilder.
*/
private static final String JOB_BUILDER_CLASS_DEFAULT =
"org.apache.hadoop.applications.mawo.server.master.job."
+ "SimpleTaskJobBuilder";
/**
* Property for setting worker workspace.
*/
private static final String WORKER_WORK_SPACE = "worker.workspace";
/**
* Default value for worker workspace.
*/
private static final String WORKER_WORK_SPACE_DEFAULT = "/tmp";
/**
* Property for resource manager url.
*/
public static final String CLUSTER_MANAGER_URL = "ycloud.url";
/**
* Default value for resource manager url.
*/
private static final String DEFAULT_CLUSTER_MANAGER_URL = "0.0.0.0:9191";
/**
* Property for setting auto shutdown for worker.
*/
public static final String AUTO_SHUTDOWN_WORKERS =
"mawo.master.auto-shutdown-workers";
/**
* Set auto shutdown of workers to False by default.
*/
private static final boolean DEFAULT_AUTO_SHUTDOWN_WORKERS = false;
/**
* Property for task status log path in master node.
*/
public static final String MASTER_TASKS_STATUS_LOG_PATH
= "master.tasks-status.log.path";
/**
* Default value for task status log path.
*/
private static final String MASTER_TASKS_STATUS_LOG_PATH_DEFAULT = "/tmp";
/**
* Property for drain event timeout.
*/
private static final String MASTER_DRAIN_EVENTS_TIMEOUT =
"master.drain-events.timeout";
/**
* Default value for drain event timeout.
*/
private static final long MASTER_DRAIN_EVENTS_TIMEOUT_DEFAULT = 60000;
/**
* Property for worker white list env.
* This environment variables will be set for all tasks.
*/
private static final String WORKER_WHITELIST_ENV = "worker.whitelist.env";
/**
* Default value for worker white list env.
*/
private static final String WORKER_WHITELIST_ENV_DEFAULT = "";
/**
* Property for teardown worker validity.
*/
private static final String MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS =
"master.teardown-worker.validity-interval.ms";
/**
* Default value for teardown worker validity.
*/
private static final String
MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS_DEFAULT = "120000";
/**
* Map of MaWo Configs.
*/
private Map<String, String> configsMap;
/**
* Mowo configuration initializer.
*/
public MawoConfiguration() {
this(readConfigFile());
}
/**
* Set up MaWo properties.
* @param properties : Map of properties
*/
private MawoConfiguration(final Properties properties) {
configsMap = new HashMap<String, String>();
configsMap.put(RPC_SERVER_HOSTNAME, properties
.getProperty(RPC_SERVER_HOSTNAME, RPC_SERVER_HOSTNAME_DEFAULT));
configsMap.put(RPC_SERVER_PORT,
properties.getProperty(RPC_SERVER_PORT, RPC_SERVER_PORT_DEFAULT));
configsMap.put(ZK_ADDRESS,
properties.getProperty(ZK_ADDRESS, ZK_ADDRESS_DEFAULT));
configsMap.put(ZK_PARENT_PATH,
properties.getProperty(ZK_PARENT_PATH, ZK_PARENT_PATH_DEFAULT));
configsMap.put(ZK_RETRY_INTERVAL_MS, properties
.getProperty(ZK_RETRY_INTERVAL_MS, ZK_RETRY_INTERVAL_MS_DEFAULT));
configsMap.put(ZK_SESSION_TIMEOUT_MS, properties
.getProperty(ZK_SESSION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS_DEFAULT));
configsMap.put(ZK_RETRIES_NUM,
properties.getProperty(ZK_RETRIES_NUM, ZK_RETRIES_NUM_DEFAULT));
configsMap.put(ZK_ACL, properties.getProperty(ZK_ACL, ZK_ACL_DEFAULT));
configsMap.put(JOB_BUILDER_CLASS,
properties.getProperty(JOB_BUILDER_CLASS, JOB_BUILDER_CLASS_DEFAULT));
configsMap.put(JOB_QUEUE_STORAGE_ENABLED,
properties.getProperty(JOB_QUEUE_STORAGE_ENABLED, "false"));
configsMap.put(CLUSTER_MANAGER_URL, properties
.getProperty(CLUSTER_MANAGER_URL, DEFAULT_CLUSTER_MANAGER_URL));
configsMap.put(WORKER_NUM_TASKS,
properties.getProperty(WORKER_NUM_TASKS, WORKER_NUM_TASKS_DEFAULT));
configsMap.put(WORKER_WORK_SPACE,
properties.getProperty(WORKER_WORK_SPACE, WORKER_WORK_SPACE_DEFAULT));
configsMap.put(AUTO_SHUTDOWN_WORKERS, properties.getProperty(
AUTO_SHUTDOWN_WORKERS, String.valueOf(DEFAULT_AUTO_SHUTDOWN_WORKERS)));
configsMap.put(MASTER_TASKS_STATUS_LOG_PATH, properties.getProperty(
MASTER_TASKS_STATUS_LOG_PATH,
String.valueOf(MASTER_TASKS_STATUS_LOG_PATH_DEFAULT)));
configsMap.put(MASTER_DRAIN_EVENTS_TIMEOUT,
properties.getProperty(MASTER_DRAIN_EVENTS_TIMEOUT,
String.valueOf(MASTER_DRAIN_EVENTS_TIMEOUT_DEFAULT)));
configsMap.put(WORKER_WHITELIST_ENV, properties.getProperty(
WORKER_WHITELIST_ENV, WORKER_WHITELIST_ENV_DEFAULT));
configsMap.put(MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS,
properties.getProperty(MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS,
MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS_DEFAULT));
}
/**
* Get MaWo config map.
* @return the config map for MaWo properties
*/
public Map<String, String> getConfigsMap() {
return configsMap;
}
/**
* Find, read, and parse the configuration file.
*
* @return the properties that were found or empty if no file was found
*/
private static Properties readConfigFile() {
Properties properties = new Properties();
// Get property file stream from classpath
LOG.info("Configuration file being loaded: " + CONFIG_FILE
+ ". Found in classpath at "
+ MawoConfiguration.class.getClassLoader().getResource(CONFIG_FILE));
InputStream inputStream = MawoConfiguration.class.getClassLoader()
.getResourceAsStream(CONFIG_FILE);
if (inputStream == null) {
throw new RuntimeException(CONFIG_FILE + " not found in classpath");
}
// load the properties
try {
properties.load(inputStream);
inputStream.close();
} catch (FileNotFoundException fnf) {
LOG.error(
"No configuration file " + CONFIG_FILE + " found in classpath.");
} catch (IOException ie) {
throw new IllegalArgumentException(
"Can't read configuration file " + CONFIG_FILE, ie);
}
return properties;
}
/**
* Get MaWo RPC server Port.
* @return value of rpc.server.port
*/
public int getRpcServerPort() {
return Integer.parseInt(configsMap.get(RPC_SERVER_PORT));
}
/**
* Get RPC Host map.
* @return value of rpc.server.hostname
*/
public String getRpcHostName() {
return configsMap.get(RPC_SERVER_HOSTNAME);
}
/**
* Check if Job Queue Storage is Enabled.
* @return True if Job queue storage is enabled otherwise False
*/
public boolean getJobQueueStorageEnabled() {
return Boolean.parseBoolean(configsMap.get(JOB_QUEUE_STORAGE_ENABLED));
}
/**
* Get ZooKeeper Address.
* @return value of ZooKeeper.address
*/
public String getZKAddress() {
return configsMap.get(ZK_ADDRESS);
}
/**
* Get ZooKeeper parent Path.
* @return value of ZooKeeper.parent.path
*/
public String getZKParentPath() {
return configsMap.get(ZK_PARENT_PATH);
}
/**
* Get ZooKeeper retry interval value in milli seconds.
* @return value of ZooKeeper.retry.interval.ms
*/
public int getZKRetryIntervalMS() {
return Integer.parseInt(configsMap.get(ZK_RETRY_INTERVAL_MS));
}
/**
* Get ZooKeeper session timeout in milli seconds.
* @return value of ZooKeeper.session.timeout.ms
*/
public int getZKSessionTimeoutMS() {
return Integer.parseInt(configsMap.get(ZK_SESSION_TIMEOUT_MS));
}
/**
* Get ZooKeeper retries number.
* @return value of ZooKeeper.retries.num
*/
public int getZKRetriesNum() {
return Integer.parseInt(configsMap.get(ZK_RETRIES_NUM));
}
/**
* Get ZooKeeper Acls.
* @return value of ZooKeeper.acl
*/
public String getZKAcl() {
return configsMap.get(ZK_ACL);
}
/**
* Get number of tasks a worker can run in parallel.
* @return value of worker.num.tasks
*/
public int getWorkerConcurrentTasksLimit() {
return Integer.parseInt(configsMap.get(WORKER_NUM_TASKS));
}
/**
* Get job builder class.
* @return value of mawo.job-builder.class
*/
public String getJobBuilderClass() {
return configsMap.get(JOB_BUILDER_CLASS);
}
/**
* Get worker work space.
* @return value of worker.workspace
*/
public String getWorkerWorkSpace() {
return configsMap.get(WORKER_WORK_SPACE);
}
/**
* Get cluster manager URL.
* @return value of ycloud.url
*/
public String getClusterManagerURL() {
return configsMap.get(CLUSTER_MANAGER_URL);
}
/**
* Check if worker auto shutdown feature is enabled.
* @return value of mawo.master.auto-shutdown-workers
*/
public boolean getAutoShutdownWorkers() {
return Boolean.parseBoolean(configsMap.get(AUTO_SHUTDOWN_WORKERS));
}
/**
* Get Task status log file path on master host.
* @return value of master.tasks-status.log.path
*/
public String getMasterTasksStatusLogPath() {
return configsMap.get(MASTER_TASKS_STATUS_LOG_PATH);
}
/**
* Get Master drain event timeout.
* @return value of master.drain-events.timeout
*/
public long getMasterDrainEventsTimeout() {
return Long.parseLong(configsMap.get(MASTER_DRAIN_EVENTS_TIMEOUT));
}
/**
* Get Worker whitelist env params.
* These params will be set in all tasks.
* @return list of white list environment
*/
public List<String> getWorkerWhiteListEnv() {
List<String> whiteList = new ArrayList<String>();
String env = configsMap.get(WORKER_WHITELIST_ENV);
if (env != null && !env.isEmpty()) {
String[] variables = env.split(COMMA_SPLITTER);
for (String variable : variables) {
variable = variable.trim();
if (variable.startsWith("$")) {
variable = variable.substring(1);
}
if (!variable.isEmpty()) {
whiteList.add(variable);
}
}
}
return whiteList;
}
/**
* Get Teardown worker validity interval.
* @return value of master.teardown-worker.validity-interval.ms
*/
public long getTeardownWorkerValidityInterval() {
return Long.parseLong(configsMap.get(
MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS));
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
/**
* Define Null Task.
*/
public class NullTask extends AbstractTask {
/**
* Null Task initializer.
*/
public NullTask() {
super();
this.setTaskType(TaskType.NULL);
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
import java.util.Map;
/**
* Define Simple Task.
* Each Task has only one command
*/
public class SimpleTask extends AbstractTask {
/**
* Simple Task default initializer.
*/
public SimpleTask() {
super();
this.setTaskType(TaskType.SIMPLE);
}
/**
* Set up Simple Task with Task object.
* @param task : Task object
*/
public SimpleTask(final Task task) {
this(task.getTaskId(), task.getEnvironment(), task.getTaskCmd(),
task.getTimeout());
}
/**
* Create Simple Task with Task details.
* @param taskId : task identifier
* @param environment : task environment
* @param taskCMD : task command
* @param timeout : task timeout
*/
public SimpleTask(final TaskId taskId, final Map<String, String> environment,
final String taskCMD, final long timeout) {
super(taskId, environment, taskCMD, timeout);
this.setTaskType(TaskType.SIMPLE);
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
import java.util.Map;
import org.apache.hadoop.io.Writable;
/**
* Define Task Interface.
*/
public interface Task extends Writable {
/**
* Get TaskId of a Task.
* @return value of TaskId
*/
TaskId getTaskId();
/**
* Get Environment of Task.
* @return map of environment
*/
Map<String, String> getEnvironment();
/**
* Get Task cmd.
* @return value of Task cmd such "sleep 1"
*/
String getTaskCmd();
/**
* Get Task type such as Simple, Composite.
* @return value of TaskType
*/
TaskType getTaskType();
/**
* Set TaskId.
* @param taskId : Task identifier
*/
void setTaskId(TaskId taskId);
/**
* Set Task environment such as {"HOME":"/user/A"}.
* @param environment : Map of environment variables
*/
void setEnvironment(Map<String, String> environment);
/**
* Set Task command.
* @param taskCMD : Task command to be executed
*/
void setTaskCmd(String taskCMD);
/**
* Get Task Timeout in seconds.
* @return value of TaskTimeout
*/
long getTimeout();
/**
* Set Task Timeout.
* @param timeout : value of Task Timeout
*/
void setTimeout(long timeout);
}

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.applications.mawo.server.master.job.JobId;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
* Defines TaskId for MaWo app.
*/
public class TaskId implements Writable {
/**
* MaWo TaskIds prefix.
*/
static final String TASK_ID_PREFIX = "mawo_task_";
/**
* MaWo Job ID.
*/
private JobId jobId = new JobId();
/**
* Mawo TaskId.
*/
private long taskId;
/**
* TaskId constructor.
*/
public TaskId() {
}
/**
* TaskId constructor with jobId and taskId.
* @param localjobId : Job identifier
* @param id : Task identifier
*/
public TaskId(final JobId localjobId, final int id) {
this.jobId = localjobId;
this.taskId = id;
}
/**
* Getter method for jobId.
* @return JobID: Job identifier
*/
public final int getJobId() {
return jobId.getID();
}
/**
* Getter method for TaskID.
* @return TaskId: Task identifier
*/
public final long getId() {
return taskId;
}
/**
* Print method for TaskId.
* @return : Full TaskId which is TaskId_prefix + jobId + _ + TaskId
*/
public final String toString() {
return TASK_ID_PREFIX + jobId.getID() + "_" + taskId;
}
@Override
/**
* Hashcode method for TaskId.
*/
public final int hashCode() {
final int prime = 31;
final int bits = 32;
int result = 1;
int jobHash = 0;
if (jobId == null) {
jobHash = 0;
} else {
jobHash = jobId.hashCode();
}
result = prime * result + jobHash;
result = prime * result + (int) (taskId ^ (taskId >>> bits));
return result;
}
@Override
/**
* Equal method override for TaskId.
*/
public final boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
TaskId other = (TaskId) obj;
if (jobId == null) {
if (other.jobId != null) {
return false;
}
} else if (!jobId.equals(other.jobId)) {
return false;
}
if (taskId != other.taskId) {
return false;
}
return true;
}
/** {@inheritDoc} */
public final void write(final DataOutput out) throws IOException {
jobId.write(out);
WritableUtils.writeVLong(out, taskId);
}
/** {@inheritDoc} */
public final void readFields(final DataInput in) throws IOException {
jobId = new JobId();
jobId.readFields(in);
this.taskId = WritableUtils.readVLong(in);
}
}

View File

@ -0,0 +1,347 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.applications.mawo.server.worker.WorkerId;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Defines TaskStatus for MaWo app.
*/
public class TaskStatus implements Writable, Cloneable {
/**
* Set logger.
*/
private static final Logger LOG = LoggerFactory.getLogger(TaskStatus.class);
/**
* TaskId is a unique task identifier.
*/
private TaskId taskId = new TaskId();
/**
* epoch time for a task starttime.
*/
private long startTime;
/**
* epoch time for a task endtime.
*/
private long endTime;
/**
* Unique worker identifier.
*/
private WorkerId workerId = new WorkerId();
/**
* Task exit code.
*/
private int exitCode = -1;
/**
* Task cmd.
*/
private String taskCMD;
/**
* Task type.
*/
private String taskType;
/**
* Possible Task States.
*/
public enum State {
/**
* INIT State refers to Task initialization.
*/
INIT,
/**
* RUNNING State refers to Task in Running state.
*/
RUNNING,
/**
* SUCCEEDED State is assigned when task finishes successfully.
*/
SUCCEEDED,
/**
* FAILED State is assigned when task fails.
*/
FAILED,
/**
* KILLED State refers to when a task is killed.
*/
KILLED,
/**
* EXPIRE State refers to when a task is expired.
*/
EXPIRE
}
/**
* Current Task state.
*/
private volatile State runState;
/**
* Task status constructor.
*/
public TaskStatus() {
}
/**
* Task status constructor with workerId, TaskId, TaskCmd, TaskType.
* @param localworkerId : Worker ID
* @param localtaskId : Task ID
* @param localtaskCMD : Task command line
* @param localtaskType : Type of Task
*/
public TaskStatus(final WorkerId localworkerId, final TaskId localtaskId,
final String localtaskCMD, final String localtaskType) {
this(localworkerId, localtaskId,
TaskStatus.State.INIT, localtaskCMD,
localtaskType);
}
/**
* Task status constructor with workerId, TaskId,
* TaskCmd, TaskType and Run State.
* @param localworkerId : Worker Id
* @param localtaskId : Task Id
* @param localrunState : Task run State
* @param localtaskCMD : Task cmd
* @param localtaskType : Task type
*/
public TaskStatus(final WorkerId localworkerId, final TaskId localtaskId,
final State localrunState, final String localtaskCMD,
final String localtaskType) {
setWorkerId(localworkerId);
setTaskId(localtaskId);
setRunState(localrunState);
setTaskCMD(localtaskCMD);
setTaskType(localtaskType);
}
/**
* Get status of a Task.
* @return Status of a Task
*/
public final State getRunState() {
return runState;
}
/**
* Update status of a Task.
* @param localrunState : Status of a Task
*/
public final void setRunState(final State localrunState) {
this.runState = localrunState;
}
/**
* Set exitcode of a Task.
* @param localexitCode : Exitcode of a Task
*/
public final void setExitCode(final int localexitCode) {
this.exitCode = localexitCode;
}
/**
* Get exitcode of a Task.
* @return exitCode of Task
*/
public final int getExitCode() {
return exitCode;
}
/**
* Set Task cmd of a Task.
* @param localcmd : command line which need to be executed
*/
public final void setTaskCMD(final String localcmd) {
this.taskCMD = localcmd;
}
/**
* Get Task cmd of a Task.
* @return TaskCmd : command line which need to be executed
*/
public final String getTaskCMD() {
return taskCMD;
}
/**
* Set Task Type.
* @param localtaskType : TaskType such as SimpleTask, NullTask
*/
public final void setTaskType(final String localtaskType) {
this.taskType = localtaskType;
}
/**
* Get Task Type.
* @return TaskType : TaskType such as SimpleTask, NullTask
*/
public final String getTaskType() {
return taskType;
}
/**
* Get Task Id.
* @return TaskId : Task identifier
*/
public final TaskId getTaskId() {
return taskId;
}
/**
* Set TaskId.
* @param localtaskId : Task identifier
*/
public final void setTaskId(final TaskId localtaskId) {
if (localtaskId != null) {
this.taskId = localtaskId;
}
}
/**
* Set staus of a Task.
* @param localtaskId : TaskId of a task
* @param localrunState : Run state of a task
*/
public final void setTaskState(final TaskId localtaskId,
final State localrunState) {
setTaskId(localtaskId);
setRunState(localrunState);
}
/**
* Get Task status of a Task.
* @param localtaskId : Task Id
* @return TaskStatus for valid Task otherwise Null
*/
public final State getTaskState(final TaskId localtaskId) {
if (localtaskId.equals(this.taskId)) {
return getRunState();
} else {
return null;
}
}
/**
* Get starttime of a Task.
* @return StartTime of Task
*/
public final long getStartTime() {
return startTime;
}
/**
* Set current time as start time of a Task.
*/
public final void setStartTime() {
this.startTime = getCurrentTime();
LOG.debug("Start Time for " + this.taskId + " is " + this.startTime);
}
/**
* Set task start time to a specific time value.
* @param time : epoch timestamp
*/
private void setStartTime(final long time) {
this.startTime = time;
}
/**
* Get task end time.
* @return End time of task.
*/
public final long getEndTime() {
return endTime;
}
/**
* Set task end time to current time.
*/
public final void setEndTime() {
this.setEndTime(getCurrentTime());
}
/**
* Set task end time to a specific time value.
* @param time : epoch timestamp
*/
private void setEndTime(final long time) {
this.endTime = time;
LOG.debug("End Time for " + this.taskId + " is " + this.endTime);
}
/**
* Get current time in milliseconds.
* @return Current time in milliseconds
*/
private long getCurrentTime() {
return System.currentTimeMillis();
}
/** {@inheritDoc} */
public final void write(final DataOutput dataOutput) throws IOException {
workerId.write(dataOutput);
taskId.write(dataOutput);
WritableUtils.writeEnum(dataOutput, runState);
WritableUtils.writeVLong(dataOutput, getStartTime());
WritableUtils.writeVLong(dataOutput, getEndTime());
WritableUtils.writeString(dataOutput, taskCMD);
WritableUtils.writeString(dataOutput, taskType);
WritableUtils.writeVInt(dataOutput, exitCode);
}
/** {@inheritDoc} */
public final void readFields(final DataInput dataInput) throws IOException {
workerId.readFields(dataInput);
taskId.readFields(dataInput);
setRunState(WritableUtils.readEnum(dataInput, State.class));
setStartTime(WritableUtils.readVLong(dataInput));
setEndTime(WritableUtils.readVLong(dataInput));
setTaskCMD(WritableUtils.readString(dataInput));
setTaskType(WritableUtils.readString(dataInput));
setExitCode(WritableUtils.readVInt(dataInput));
}
/**
* Get workerId.
* @return workerId : Worker identifier
*/
public final WorkerId getWorkerId() {
return workerId;
}
/**
* Set WorkerId.
* @param localworkerId : Worker identifier
*/
public final void setWorkerId(final WorkerId localworkerId) {
this.workerId = localworkerId;
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
/**
* Define MaWo Task Type.
*/
public enum TaskType {
/**
* Its a Simple Task.
*/
SIMPLE,
/**
* Its a composite task which consists of multiple simple tasks.
*/
COMPOSITE,
/**
* Its a null task.
*/
NULL,
/**
* Die Task is to signal for suicide.
*/
DIE,
/**
* Teardown Task is a task which runs after all tasks are finished.
*/
TEARDOWN
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
import java.util.Map;
/**
* Define Teardown Task.
*/
public class TeardownTask extends SimpleTask {
/**
* Teardown task default constructor.
*/
public TeardownTask() {
super();
setTaskType(TaskType.TEARDOWN);
}
/**
* Teardown Task constructor.
* @param taskId : Teardown task Id
* @param environment : Environment map for teardown task
* @param taskCMD : Teardown task command
* @param timeout : Timeout for Teardown task
*/
public TeardownTask(final TaskId taskId,
final Map<String, String> environment,
final String taskCMD, final long timeout) {
super(taskId, environment, taskCMD, timeout);
setTaskType(TaskType.TEARDOWN);
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
import org.apache.hadoop.applications.mawo.server.worker.WorkerId;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolInfo;
/**
* Define work assignment protocol.
*/
@ProtocolInfo(protocolName = "WorkAssignmentProtocol", protocolVersion = 1)
public interface WorkAssignmentProtocol {
/**
* Get next workerId to which new task will be assigned.
* @return return workerId text
*/
Text getNewWorkerId();
/**
* Register Worker.
* When worker will be launched first, it needs to be registered with Master.
* @param workerId : Worker Id
* @return Task instance
*/
Task registerWorker(WorkerId workerId);
/**
* De Register worker.
* When worker is de-registered, no new task will be assigned to this worker.
* @param workerId : Worker identifier
*/
void deRegisterWorker(WorkerId workerId);
/**
* Worker sends heartbeat to Master.
* @param workerId : Worker Id
* @param taskStatusList : TaskStatus list of all tasks assigned to worker.
* @return Task instance
*/
Task sendHeartbeat(WorkerId workerId, TaskStatus[] taskStatusList);
/**
* Add Task to the list.
* @param task : Task object
*/
void addTask(Task task);
}

View File

@ -0,0 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
/**
* Helper classes for the mawo server common operations.
*/

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.master.job;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
* Define MaWo JobId.
*/
public class JobId implements Writable {
/**
* MaWo job prefix.
*/
private static final String JOB_PREFIX = "mawo_job_";
/**
* Create unique random JobId.
* @return unique random JobId
*/
static JobId newJobId() {
Random rn = new Random();
final int range = 900000;
final int randomadd = 100000;
int randomNum = rn.nextInt(range) + randomadd;
return new JobId(randomNum);
}
/**
* Unique Id.
*/
private int jobIdentifier;
/**
* JobId default constructor.
*/
public JobId() {
}
/**
* JobId constructor with Id.
* @param id : unique id
*/
public JobId(final int id) {
this.jobIdentifier = id;
}
/**
* Get JobId.
* @return unique ID
*/
public final int getID() {
return jobIdentifier;
}
/**
* Print JobId.
* @return JobId
*/
public final String toString() {
return JOB_PREFIX + jobIdentifier;
}
@Override
/**
* Hashcode for jobId.
*/
public final int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + jobIdentifier;
return result;
}
@Override
/**
* Implement equals method for jobId.
*/
public final boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
JobId other = (JobId) obj;
if (jobIdentifier != other.jobIdentifier) {
return false;
}
return true;
}
/** {@inheritDoc} */
public final void write(final DataOutput out) throws IOException {
WritableUtils.writeVInt(out, jobIdentifier);
}
/** {@inheritDoc} */
public final void readFields(final DataInput in) throws IOException {
this.jobIdentifier = WritableUtils.readVInt(in);
}
}

View File

@ -0,0 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.applications.mawo.server.master.job;
/**
* Helper classes for the mawo master job.
*/

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.worker;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
/**
* Define WorkerId for Workers.
*/
public class WorkerId implements Writable {
/**
* WorkerId is a unique identifier for workers.
*/
private Text workerId = new Text();
/**
* Hostname of worker node.
*/
private Text hostname = new Text();
/**
* Ip address of worker node.
*/
private Text ipAdd = new Text();
/**
* Default constructor for workerId.
* Set Hostname and Ip address of the machine where worker is running.
*/
public WorkerId() {
try {
this.hostname =
new Text(InetAddress.getLocalHost().getHostName());
this.ipAdd =
new Text(InetAddress.getLocalHost().getHostAddress().toString());
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
/**
* Get hostname for Worker.
* @return hostname of worker node
*/
public final Text getHostname() {
return hostname;
}
/**
* Set hostname for Worker.
* @param wkhostname : Hostname of worker
*/
public final void setHostname(final Text wkhostname) {
this.hostname = wkhostname;
}
/**
* Get Worker IP address.
* @return IP address of worker node
*/
public final String getIPAddress() {
return this.ipAdd.toString();
}
/**
* Print workerId.
* @return workeId in string
*/
public final String toString() {
return workerId.toString();
}
/**
* Get workerId.
* @return workerId : Worker identifier
*/
public final String getWorkerId() {
return this.workerId.toString();
}
/**
* Set workerId.
* @param localworkerId : Worker identifier
*/
public final void setWorkerId(final String localworkerId) {
this.workerId = new Text(localworkerId);
}
@Override
/**
* Implememt equals method for WorkerId.
*/
public final boolean equals(final Object o) {
WorkerId x = (WorkerId) o;
return x.getHostname().equals(this.hostname);
}
/** {@inheritDoc} */
public final void write(final DataOutput dataOutput) throws IOException {
workerId.write(dataOutput);
hostname.write(dataOutput);
ipAdd.write(dataOutput);
}
/** {@inheritDoc} */
public final void readFields(final DataInput dataInput) throws IOException {
workerId.readFields(dataInput);
hostname.readFields(dataInput);
ipAdd.readFields(dataInput);
}
@Override
/**
* Override hashcode method for WorkerId.
*/
public final int hashCode() {
final int prime = 31;
int result = 1;
int workerHash = 0;
if (workerId == null) {
workerHash = 0;
} else {
workerHash = workerId.hashCode();
}
int hostHash = 0;
if (hostname == null) {
hostHash = 0;
} else {
hostHash = hostname.hashCode();
}
int ipHash = 0;
if (ipAdd == null) {
ipHash = 0;
} else {
ipHash = ipAdd.hashCode();
}
result = prime * result + workerHash;
result = prime * result + hostHash;
result = prime * result + ipHash;
return result;
}
}

View File

@ -0,0 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.applications.mawo.server.worker;
/**
* Helper classes for the mawo worker.
*/

View File

@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
rpc.server.hostname=localhost
rpc.server.port=5120
#curator related configurations
zookeeper.parent.path=/mawoRoot
zookeeper.address=localhost:2181
zookeeper.retry.interval.ms=1000
zookeeper.session.timeout.ms=10000
zookeeper.retries.num=1000
zookeeper.acl=world:anyone:rwcda
worker.num.tasks=10
mawo.job-queue-storage.enabled=true
mawo.job-builder.class=org.apache.hadoop.applications.mawo.server.master.job.SimpleTaskJsonJobBuilder
worker.workspace=/tmp
ycloud.url=0.0.0.0:9191
task.block-size.min=10
task.uber-time.min=12
master.tasks-status.log.path=/tmp
master.teardown-worker.validity-interval.ms=120000
#a comma-separated list of environment variables needed by all the tasks
worker.whitelist.env=

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.applications.mawo.server.common;
import org.junit.Assert;
import org.junit.Test;
/**
* Test MaWo configuration.
*/
public class TestMaWoConfiguration {
/**
* Validate default MaWo Configurations.
*/
@Test
public void testMaWoConfiguration() {
MawoConfiguration mawoConf = new MawoConfiguration();
// validate Rpc server port
Assert.assertEquals(mawoConf.getRpcServerPort(), 5120);
// validate Rpc hostname
Assert.assertTrue("localhost".equals(mawoConf.getRpcHostName()));
// validate job queue storage conf
boolean jobQueueStorage = mawoConf.getJobQueueStorageEnabled();
Assert.assertTrue(jobQueueStorage);
// validate default teardownWorkerValidity Interval
Assert.assertEquals(mawoConf.getTeardownWorkerValidityInterval(), 120000);
// validate Zk related configs
Assert.assertTrue("/tmp/mawoRoot".equals(mawoConf.getZKParentPath()));
Assert.assertTrue("localhost:2181".equals(mawoConf.getZKAddress()));
Assert.assertEquals(1000, mawoConf.getZKRetryIntervalMS());
Assert.assertEquals(10000, mawoConf.getZKSessionTimeoutMS());
Assert.assertEquals(1000, mawoConf.getZKRetriesNum());
}
}

View File

@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
rpc.server.hostname=localhost
rpc.server.port=5120
#curator related configurations
zookeeper.parent.path=/tmp/mawoRoot
zookeeper.address=localhost:2181
zookeeper.retry.interval.ms=1000
zookeeper.session.timeout.ms=10000
zookeeper.retries.num=1000
zookeeper.acl=world:anyone:rwcda
worker.num.tasks=10
mawo.job-queue-storage.enabled=true

View File

@ -0,0 +1,37 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hadoop-yarn-applications</artifactId>
<groupId>org.apache.hadoop</groupId>
<version>3.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop.applications.mawo</groupId>
<artifactId>hadoop-applications-mawo</artifactId>
<packaging>pom</packaging>
<name>Apache Hadoop YARN Application MaWo</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<modules>
<module>hadoop-yarn-applications-mawo-core</module>
</modules>
</project>

View File

@ -38,6 +38,7 @@
<module>hadoop-yarn-applications-unmanaged-am-launcher</module>
<module>hadoop-yarn-services</module>
<module>hadoop-yarn-applications-catalog</module>
<module>hadoop-yarn-applications-mawo</module>
</modules>
<profiles>

View File

@ -0,0 +1,36 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
# MaWo: A Master Worker framework on YARN Services
## Overview
MaWo is a YARN service based framework which handles Master Worker based workload.
This is an app which can take an input job specification with tasks, their expected durations and have a Master dish the tasks off to a predetermined set of workers.
The components will be responsible to finish the job within specific time duration.
## MaWo Components
MaWo app is a YARN Service Application. It has mainly two components.
* Master
- Read MaWo-Payload file and create a queue of Tasks
- Register Worker
- Assign tasks to worker nodes
- Monitor status of Tasks
- Log Task status
* Worker
- Send heartbeat to Worker
- Execute Task